2017-10-18 56 views
0

我在如何使用RxJava表達以下方面掙扎。讓我們想象一下我的包列表中的流,我想在網絡上傳輸:使用RxJava的同步序列

PublishSubject<List<Packet>> packetsSubject = ...; 

而且我有以下傳輸功能:

public Observable<Status> transmit(Packet p) {...} 

我想傳遞一個列表,每包只要包的返回狀態是Status.OK。換句話說,如果第n個分組的傳輸是NOK,那麼不應發送第n + 1個分組。

此外,如果檢測到一個錯誤:

  • 應與包的列表中的索引中顯示一個錯誤
  • 分組的下一個清單的傳遞應該開始

感謝您閱讀我

+0

您是否嘗試使用'concat'運算符?對於錯誤處理,新列表是否與舊列表有任何關係? – masp

+0

這兩個列表沒有任何共同之處,我通過切分請求負載來獲取數據包列表。 concat操作符如何適合圖片? – n1r3

+0

我認爲'concat'的請求發生的順序與列表中的順序相同。如果一個失敗,那麼你不會嘗試下一個項目。 – masp

回答

1

您可以使用flatMap()運算符將列表轉換爲可觀察條目。使用zipWith()運算符將每個列表條目與單個列表中數據包的索引配對。 flatMap()再次發送並獲得結果狀態。將一個NOK變成一個throwable,並終止內部觀察鏈。在使用flatMap()運算符時,添加最終參數1會使映射一次發生,因此在此觀察鏈中沒有請求重疊。

packetsSubject 
     .flatMap(listOfPackets -> Observable.from(listOfPackets) 
      .zipWith(Observable.range(1, LARGE_NUMBER), (p, i) -> new Pair<>(p, i)) 
      .flatMap(pair -> transmits(pair.getFirst()) 
        .flatMap(status -> status.equals("OK") 
          ? Observable.empty() 
          : Observable.error(new IllegalStateException())) 
        .doOnError(error -> System.out.println("Error at index " + pair.getSecond())) 
        .onErrorResumeNext(Observable.empty()), 1)) 
    .subscribe(); 

雖然我對NOK映射到錯誤,記錄它,然後重新映射結果返回的最後步驟,內部不舒服,有它的一個原因。 listOfPackets的處理必須在處理完所有數據包或看到Status.NOK時終止;將Status.NOK轉換爲錯誤指示符即可完成此操作。但是,我們只希望單個列表的處理終止,而不是整個可觀察的。

+0

flatMap()那狗屎!謝謝,非常有幫助。 – n1r3