2016-09-26 53 views
1

我是Rxjs的新手。如果可能,我想遵循最佳做法。將三個不同的函數映射到Node.js中的Observable

我正在執行三個不同的函數對可觀察到的相同數據。遵循'數據流'的概念,我一直認爲我需要將這個Observable分成三個流並繼續。

這裏是我的代碼,這樣我就可以停止抽象談論:

// NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back. 
let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse); 

// Success Stream to perform further actions upon. 
let successStream = base_subscription[0]; 

// The Empty stream for error reporting 
let failureStream = base_subscription[1]; 

//Code works up until this point. I don't know how to split to 3 different streams. 
successStream.filter(isSite) 
      .map(grabData)// Async action that returns data 
      /*** Perform 3 separate actions upon data that .map(grabData) returned **/ 
      .subscribe(); 

我如何拆分此數據流的三成,並且數據的每個實例映射到不同的功能?

+0

你是什麼意思_「將每個數據實例映射到不同的功能」_?你想將'siteStream'分成三個Observable而不是兩個? – martin

回答

1

其實partition()運營商內部只是calls filter() operator twice。首先從匹配predicate的值創建Observable,然後創建與predicate不匹配的值。

所以你可以做同樣的事情與filter() operator

let obs1 = base_subscription.filter(val => predicate1); 
let obs2 = base_subscription.filter(val => predicate2); 
let obs3 = base_subscription.filter(val => predicate3); 

現在你有三個觀測量,他們每個人只發一些特定的值。然後,您可以與您現有的代碼進行:

obs2.filter(isSite) 
    .map(grabData) 
    .subscribe(); 

要知道,調用subscribe()從源頭上可觀察到觸發生成值。根據您使用的Observable的不同,這並不總是這樣。請參閱文檔中的「Hot」 and 「Cold」 Observables。根據您的用例,Operator connect()可能對您有用。

+0

昨天晚上我慢慢想出了這個,但你確認了我的懷疑。我會檢查出連接運營商。感謝幫助。 – calbear47