2017-10-04 104 views
1

我遇到了在新線程上運行RxJava observables的問題。我試圖在Scheduler.io()上運行壓縮的DB查詢,但它無論如何都會阻塞UI線程。不知道爲什麼。Observable仍然在UI線程上運行

代碼爲DB:

public Observable<List<T>> searchModelsInView(View view, final Collection<String> searchTerms, 
     KeyMatchLevel keyMatchLevel, int queryLim) { 
    return Observable.just(
      searchModelsTask(view, new HashSet<>(searchTerms), keyMatchLevel, queryLim)); 
} 

這裏就是壓縮和解發生代碼:

public Observable<Set<Airport>> getSearchResultsForAll(String term, int queryLim) { 
    List<String> terms = CollectionsUtil.asArrayList(term); 
    Observable<List<Airport>> macObservable = onNewThread(
      searchModelsInView(getMacView(), terms, 
        KeyMatchLevel.PREFIXED, queryLim)); 
    Observable<List<Airport>> nameObservable = onNewThread(
      searchModelsInView(getNameView(), terms, 
        KeyMatchLevel.PREFIXED, queryLim)); 
    Observable<List<Airport>> codeObservable = onNewThread(
      searchModelsInView(getCodeView(), terms, 
        KeyMatchLevel.PREFIXED, NONE)); 
    Observable<List<Airport>> regionObservable = onNewThread(
      searchModelsInView(getCityView(), terms, 
        KeyMatchLevel.PREFIXED, NONE)); 

    ; 
    return Observable.zip(macObservable, nameObservable, codeObservable, regionObservable, 
      (macList, nameList, codeList, regionList) -> { 

       Set<Airport> resultSet = new LinkedHashSet<Airport>(); 

       Airport mac = macList.get(0); 

       if (term.length() > CODE_LEN) { 
        if (lowerCase(mac.getCity()).contains(lowerCase(term))) { 
         handleMACFound(resultSet, mac, regionList); 
         return resultSet; 
        } 
        resultSet.addAll(nameList); 
        resultSet.addAll(regionList); 
        return resultSet; 
       } 


       boolean macFound = StringUtil.isEquals(lowerCase(term), 
         lowerCase(mac.getCode())); 

       if (macFound) { 
        handleMACFound(resultSet, mac, codeList); 
        return resultSet; 
       } 

       resultSet.addAll(codeList); 
       resultSet.addAll(nameList); 
       resultSet.addAll(regionList); 

       return resultSet; 
      }); 
} 

private Observable<List<Airport>> onNewThread(Observable<List<Airport>> observable) { 
    return observable.subscribeOn(Schedulers.io()); 
} 

這就是它被稱爲:

public void searchKey(String searchTerms, Airport... excluding) { 
    TBDataBase.getAirportDB() 
      .getSearchResultsForAll(searchTerms, MAX_AIRPORT_SEARCHED) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(airports -> { 
       setResults(airports, excluding); 
       listener.onRegularAirportSearch(); 
       listener.onSearchPerformed(); 
      }); 


} 

好像它應該在運行Scheduler.io()但它阻止主線程,所以它不。

+0

難道是訂閱塊中的setResults嗎?永久封鎖還是暫時getSearchResultForAll正在運行? –

+0

@HansWurst它暫時被阻止。數據庫查詢完成後,它會恢復正常。 –

回答

2

看起來問題出在你的searchModelsTask:

public Observable<List<T>> searchModelsInView(View view, final Collection<String> searchTerms, 
     KeyMatchLevel keyMatchLevel, int queryLim) { 
    return Observable.just(
      searchModelsTask(view, new HashSet<>(searchTerms), keyMatchLevel, queryLim)); 
} 

Observable.just希望你已經擁有的價值。所以它在observable創建之前執行searchModelsTask。因此,您試圖將其放在I/O調度程序上並不重要。嘗試使用延遲與剛內:

public Observable<List<T>> searchModelsInView(View view, final Collection<String> searchTerms, 
     KeyMatchLevel keyMatchLevel, int queryLim) { 
    return Observable.defer(() -> Observable.just(
      searchModelsTask(view, new HashSet<>(searchTerms), keyMatchLevel, queryLim))); 
} 

public Observable<List<T>> searchModelsInView(View view, final Collection<String> searchTerms, 
      KeyMatchLevel keyMatchLevel, int queryLim) { 

return Observable.defer(new Func0<Observable<List<T>>>(){ 

     @Override 
     public Observable<List<T>> call() { 
      return Observable.just(
        searchModelsTask(view, new HashSet<>(searchTerms), keyMatchLevel, queryLim)); 
     } 
    }); 
} 

你也可以考慮使用延遲的Observable.callable而不是爲好。

+0

非常感謝很多人!這是正確的答案! –

+0

@OmerOzer真棒:) –

相關問題