2016-11-16 77 views
2

短篇小說結合觀測: 我有一個情況我有2個觀測量有一個目的:RxJava不重複執行

  • 他們收到了一些數據
  • 他們返回修改後的數據
  • 拋出如果數據無法處理,則會出錯

它們各自負責處理不同類型的數據。另外,我想在兩個數據都被處理時做一些事情。

我目前最好的實現方式如下,這些都是我的觀測量:

Single<BlueData> blueObservable = Single.create(singleSubscriber -> { 
     if (BlueDataProcessor.isDataValid(myBlueData)) { 
      singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData)); 
     } 
     else { 
      singleSubscriber.onError(new BlueDataIsInvalidThrow()); 
     } 
    }); 

    Single<RedData> redObservable = Single.create(singleSubscriber -> { 
     if (RedDataProcessor.isDataValid(myRedData)) { 
      singleSubscriber.onSuccess(RedDataProcessor.process(myRedData)); 
     } 
     else { 
      singleSubscriber.onError(new RedDataIsInvalidThrowable()); 
     } 
    }); 

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable, 
      (blueData, redData) -> PurpleGenerator.combine(blueData, redData)); 

我也有以下訂閱:

blueObservable.subscribe(
      result -> { 
       saveBlueProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

    redObservable.subscribe(
      result -> { 
       saveRedProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 


    composedSingle.subscribe(
      combinedResult -> { 
       savePurpleProcessStats(combinedResult) 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

我的問題: 藍色&紅色數據處理兩次,因爲這兩個訂閱都再次運行,我訂閱了使用Observable.zip()創建的組合observable。

如何在不同時運行兩次操作的情況下發生此行爲?

回答

2

這在1x中的Single中是不可能的,因爲沒有ConnectableSingle的概念,因此也沒有Single.publish的概念。您可以通過2.x和RxJava2Extensions庫實現該效果:

SingleSubject<RedType> red = SingleSubject.create(); 
SingleSubject<BlueType> blue = SingleSubject.create(); 

// subscribe interested parties 
red.subscribe(...); 
blue.subscribe(...); 

Single.zip(red, blue, (r, b) -> ...).subscribe(...); 

// connect() 
blueObservable.subscribe(blue); 
redObservable.subscribe(red); 
+0

我會盡快實施此解決方案,只要我可以將此項目升級到rxjava2。不過,我很高興用這個解決方案將它標記爲正確的答案。謝謝! – dbar