2017-05-15 34 views
1

如何更改訂戶的錯誤?我有一個從數據庫中消耗的冷流。請參閱以下情況:錯誤的可觀察更改訂戶

return coctailBundleStream 
     .doOnNext(c -> { 
      hostnames.add(c.get(KEY_HOSTNAME)); // [A] 

      sendToOutboundQueue(c.get(KEY_CREDS)); 
      archiveSentMessage(c.get(KEY_CREDS), c.get(KEY_MESSAGE_ID)); 
     }) 
     .doOnComplete(this::saveCutOffTime) 
     .doOnError(e -> informUserImpactedHostnames(hostnames, 
      theRestOfHostnamesInside(credsXmlStream, e))) // I don't think this is right 
     .onErrorResumeNext(Flowable.empty()) 
     .count(); 

我想發送受故障影響的所有主機名。但是,請參閱我上面的評論。我不認爲這是正確的,因爲流被消耗了兩次。例如如果theRestOfHostnamesInside實現是credsStream.map(c -> c.getHostname()), e)

我認爲,理想情況下,錯誤處理程序應該使用其他認購其提取主機名的其餘部分到一個列表,然後追加與以前的清單列表中繼續流(見行標與[A])。

回答

0

onErrorResumeNext應該用來提供您想要回退的流動性。

但是,主要困難是避免重複。如果源代碼很冷,您將重新執行數據庫請求,並且如果最初訂閱的序列在出現錯誤之前發送了一些數據,則會重新發送相同的數據。

您可以通過在onErrorResumeNext之後鏈接distinct(您應該能夠提供keySelector來指示如何檢測重複項)。但是您必須確保使用不會將源中的兩個元素標記爲重複的標準(以便僅消除重試創建的重複項)。

周圍的另一種方式是存儲密鑰已經處理自己的集合中,並在onErrorResumeNext過濾這些,但是你必須確保該收集特定於count()的下游製造每個subscribe ...所以不那很容易。

+0

我不喜歡重做相同的數據庫請求的想法。這對我來說似乎沒有效率。 – sancho21

+0

它取決於原始錯誤是什麼,但是你可以退回第二個序列,只查詢丟失的鍵......因爲'onErrorResumeNext'將來自錯誤序列(如果有的話)的數據與來自後備的數據連接起來順序,這符合法案 –

0

你可以這樣做只是內部的flatMap

 Observable.fromIterable(yourList) 
      .flatMap(x ->{ 
       Observable.just(x) 
         .map(data -> yourNormalSave(data)) 
         .onErrorReturn(errorResult) 
      }) 
      .subscribe(result ->{ 
         if(result != errorResult) 
          count++; 
         eles{ 
         //error received here 
         } 
      } 
      ); 

所以flatMap內的,如果你遇到一個錯誤,將其轉換爲普通類型,但下游沒有它的想法。所以你的下游用戶也會在onNext上使用它們。你可以正確地做,而不是x -> Observable.just(x)我只是把它們作爲一個例子。