2017-04-16 41 views
0

我是RxJava的新手,我知道flatmaps是用於映射一個發射項目到observable。我也知道根據documentation發射的觀測值全部被合併(變平)成單個可觀測的流。RxJava平面地圖:當一個結果可觀察到完成時會發生什麼?

我想知道如果任何內部可觀察結果完成會發生什麼?

例如:我有一個observable,它發出一個item數據鍵。我不得不做另一個異步http調用來從服務器獲取項目數據,所以我使用另一個observable來調用它。我使用平面地圖連接這兩個並創建一個主觀察。

什麼時候調用「SomeMethodThatWantsItems」之後的run()方法?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine) 
{ 
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word"); 
    searchObservable 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(new Consumer<Item>(){ 
          @Override 
          public void accept(@NonNull Item item) throws Exception { 
           //Do stuff with the item 
          } 
         } 
       , new Consumer<Exception>() { //some implementation of onErrorConsumer 
        } 
       //OnComplete 
       , new Action(){ 

         @Override 
         public void run() throws Exception { 
          //When does this get called??? after the search complete or when the first http call is successful? 
         } 
        }); 

} 

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord) 
{ 
    return Observable.create(new ObservableOnSubscribe<String>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception { 

      //Assume that our search engine call onFind everytime it finds something 
      searchEngine.addSearchListener(new searchEngineResultListener(){ 
       @Override 
       public void onFind(String foundItemKey){ 
        emitter.onNext(foundItemKey); 
       } 

       @Override 
       public void onFinishedFindingResults(){ 
        emitter.onComplete(); 
       } 
      }); 

     } 
    }); 
} 

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key) 
{ 

    return Observable.create(new ObservableOnSubscribe<Item>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception { 

      //Call the server to get the item 
      httpCaller.call(key, new onCompleteListener(){ 
       @Override 
       public void onCompletedCall(Item result) 
       { 
        emitter.onNext(result); 
        //The result is complete! end the stream 
        emitter.onComplete(); 
       } 
      }); 
     } 
    }); 
} 

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){ 
    //Where everything comes together 
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord); 
    retuern searchResultObservable 
      .observeOn(Schedulers.newThread()) 
      .flatMap(new Function<String, Observable<Item>>(){ 
       @Override 
       public Observable<Item> apply(String key){ 
        return getItemByKey(httpCaller, key); 
       } 
      }); 
} 

回答

3

onComplete()總是調用一次,然後流停止。 (這是Observable Contract的一部分)。
這意味着,在您的情況下,您的onComplete()SomeMethodThatWantsItems將被調用後所有項目被檢索。
flatMap()情況下,每個內Observable完成後,簡單地將信號源Observable停止從內Observable平光項到源ObservableflatMap()從內Observable合併項只要這個流發送物品,所以它基本上將整個內部Observable流消耗到源流中,整個流直到終止事件3如onComplete(),因此在內部Observable可以發射多於1個項目的情況下,這意味着它將在源流上產生多於1個發射。

相關問題