2017-06-29 44 views
0

下面的代碼確實網絡調用正常。但它不斷髮出相同的結果。我想只取得第一個結果並停止排放。有沒有我能說的發出第一個命令。我試圖採取(1),但由於某種原因改變了結果的大小。Rxjava2 - 如何讓觀察者在第一次停止發射物品或使用zipWith?

//class variables 
    FeedsModel feedsModelResult; 
    HashMap<Integer, ProductModel> productMap; 

//method 

    @Override 
     protected Observable buildUseCaseObservable() { 
      /* gets feedModel then parses through each feed for product IDs. then does a network call to get each product. stores retrieved 
      product model in hashmap for quick retrieval. returns a pair. 
      */ 
      return feedRepository.fetchFeeds(shopId, langId, skip) 
        .concatMap(new Function<FeedsModel, ObservableSource<List<Feed>>>() { 
         @Override 
         public ObservableSource<List<Feed>> apply(@NonNull final FeedsModel feedsModel) throws Exception { 
          feedsModelResult = feedsModel; 
          return Observable.fromCallable(new Callable<List<Feed>>() { 
           @Override 
           public List<Feed> call() throws Exception { 

            return feedsModel.getFeed(); 
           } 
          }); 
         } 
        }) 
        .concatMap(new Function<List<Feed>, ObservableSource<Feed>>() { 
         @Override 
         public ObservableSource<Feed> apply(@NonNull List<Feed> feeds) throws Exception { 

          return Observable.fromIterable(feeds); 
         } 
        }).filter(new Predicate<Feed>() { 
         @Override 
         public boolean test(@NonNull Feed feed) throws Exception { 
          return feed.getProducts() != null; 
         } 
        }) 
        .concatMap(new Function<Feed, ObservableSource<Double>>() { 
         @Override 
         public ObservableSource<Double> apply(@NonNull Feed feed) throws Exception { 
          return Observable.fromIterable((ArrayList<Double>) feed.getProducts()); 
         } 
        }) 
        .concatMap(new Function<Double, ObservableSource<ProductModel>>() { 
         @Override 
         public ObservableSource<ProductModel> apply(@NonNull Double productId) throws Exception { 
          return productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency); 
         } 
        }).concatMap(new Function<ProductModel, ObservableSource<Map<Integer, ProductModel>>>() { 
         @Override 
         public ObservableSource apply(@NonNull ProductModel productModel) throws Exception { 

          productMap.put(productModel.getIdProduct(), productModel); 
          return Observable.fromCallable(new Callable<Map<Integer, ProductModel>>() { 
           @Override 
           public Map<Integer, ProductModel> call() throws Exception { 
            return productMap; 
           } 
          }); 
         } 
        }).concatMap(new Function<Map<Integer, ProductModel>, ObservableSource<Pair>>() { 
         @Override 
         public ObservableSource apply(@NonNull final Map<Integer, ProductModel> productModelMap) throws Exception { 
          return Observable.fromCallable(new Callable() { 
           @Override 
           public Object call() throws Exception { 
            return Pair.create(feedsModelResult, productMap); 
           } 
          }); 
         } 
        }); 
     } 

UPDATE: 在onSubscribe我一直到一次性的參考和onNext對其進行處理()後,我得到的第一個結果。這種做法有效嗎?

最後調用的結果Pair.create(feedsModelResult,productMap);我想我應該使用zipWith運算符來等待所有的結果完成,但我不知道如何

回答

1

瞭解您的流程並不容易,但似乎您正在嘗試查詢某個FeedsModel對象,併發出單個Pair值包裝此FeedsModel對象以及其包含的其內部產品對象的某些收集的映射。

問題在於你的Observable可能在每個內部列表中展開,並試圖沿着地圖的方式收集它們,而流仍然會將這些項目發送給最終用戶。

你需要的是一個流是得到一個輸入FeedsModel,收集所有的項目地圖和僅發射單一項目這是導致地圖,那麼你就可以連同輸入FeedsModel與一起配對本地圖結果你剛收集的地圖。
假設您的feedRepository.fetchFeeds只能返回單個FeedsModel項目(您可以使用Single<FeedsModel>對其進行優化),那麼您將在此處獲得單一結果。

一個建議是使用flatMap的變體,其包括結果選擇:

feedRepository.fetchFeeds(shopId, langId, skip) 
.flatMap(feedsModel -> 
     getProductsMapFromFeedsModelObservable(feedsModel, shopId, langId, currency) 
     ,(feedsModel, productsMap) -> 
       Pair.create(feedsModel, productsMap) 
     ); 

getProductsMapFromFeedsModelObservable()Observable,收集產品,以產品地圖從輸入FeedsModel:

private Observable<HashMap<Integer, ProductModel>> getProductsMapFromFeedsModelObservable(
      FeedsModel feedsModel, int shopId, int langId, int currency) { 
     return Observable.fromIterable(feedsModel.getFeed()) 
       .filter(feed -> feed.getProducts() != null) 
       .flatMapIterable(feed -> feed.getProducts()) 
       .flatMap(productId -> productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency)) 
       .reduce(new HashMap<Integer, ProductModel>(), 
         (productsMap, productModel) -> { 
          productsMap.put(productModel.getIdProduct(), productModel); 
          return productsMap; 
         }) 
       .toObservable(); 
    } 

reduce()是用來收集所有物品來映射和發射單個物品,flatMap()在這裏使用,因爲你可以從並行執行獲得,concat()可能不需要反正,不確定爲什麼它在第一位(維持秩序?)。

相關問題