2016-02-12 43 views
1

我在我的應用程序中有2個服務。一種youTube服務,用於通過網絡加載您的管理評論線索和評論以及管理評論批量加載的評論服務。評論服務特定於我的應用程序,youTube服務與應用程序無關。如何使用一個RXJs流爲2個不同的東西

我有一個函數getCommentThreadsForChannel加載註釋線程。這個主要的實現是在youTube服務中,但是在評論服務上調用這個服務,基本上只是調用從youTube服務返回observable。

只要我的控制器調用這個,這只是一個可觀察的註釋序列。但是,在我的commentService我想要在本地存儲這些線程。我想批量存儲所有線程,只要我有100多個線程,這樣我就不會爲每一個新的數據位處理列表。我想出了這個代碼:

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     var threadStream: Rx.Observable<ICommentThread> = 
      this.youTubeService.getCommentThreadsForChannel(); 

     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

我認爲這裏配料線程和積累的所有線程進入一個陣列的邏輯是不錯,但這些代碼不會被調用。我認爲這是因爲我沒有訂閱這個線程。

我不想在這裏訂閱,因爲它會訂閱底層的流,然後我會有2訂閱,所有的數據將加載兩次(有很多數據 - 它需要大約一分鐘的時間來加載它全部和超過30個100個線程的調用)。

我基本上想要一個do這裏不會影響傳遞給控制器​​的流,但我想使用RXjs的緩衝和累積邏輯。

我相信,我需要共享或發佈以某種方式流,但我有使用這些運營商之前,不能看我怎麼會是能夠做到這一點,不添加第二訂閱收效甚微。

如何共享一個流並以兩種不同方式使用它,而無需訂閱兩次?我可以創建某種被動的流,只有當它基於訂閱的可觀察性時才訂閱嗎?

回答

1

我最終想到了這一點(有一些來自@MonkeyMagiic的幫助)。

我不得不共用流,這樣我可以在每個值做2件不同的事情的數據,緩衝區(IT)和個別。問題是這兩個流都必須訂閱,但我不想在服務中訂閱 - 這應該在控制器中完成。

的解決方案是將2個流再次合併,並從緩衝忽略值:

var intervalStream = Rx.Observable.interval(250) 
    .take(8) 
    .do(function(value){console.log("source: " + value);}) 
    .shareReplay(1); 

var bufferStream = intervalStream.bufferWithCount(3) 
    .do(function(values){ 
     console.log("do something with buffered values: " + values); 
    }) 
    .flatMap(function(values){ return Rx.Observable.empty(); }); 

var mergeStream = intervalStream.merge(bufferStream); 

mergeStream.subscribe(
    function(value){ console.log("value received by controller: " + value); }, 
    function(error){ console.log("error: " + error); }, 
    function(){ console.log("onComplete"); } 
); 

輸出:

"source: 0" 
"value received by controller: 0" 
"source: 1" 
"value received by controller: 1" 
"source: 2" 
"value received by controller: 2" 
"do something with buffered values: 0,1,2" 
"source: 3" 
"value received by controller: 3" 
"source: 4" 
"value received by controller: 4" 
"source: 5" 
"value received by controller: 5" 
"do something with buffered values: 3,4,5" 
"source: 6" 
"value received by controller: 6" 
"source: 7" 
"value received by controller: 7" 
"do something with buffered values: 6,7" 
"onComplete" 

JSBin

0

我相信你要找的是sharereplay的組合,謝天謝地RX有shareReplay(bufferSize)

var threadStream: Rx.Observable<ICommentThread> = this.youTubeService 
       .getCommentThreadsForChannel() 
       .shareReplay(1); 

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

使用組播運營商的份額將確保第一後,每個追加認購不作不必要的請求和重播,以確保所有新訂閱收到的最後通知。

+0

哈,只注意到我們爲同兩個工作公司! – MonkeyMagiic

+0

值得注意的是,現在'shareReplay'已經從最新的RxJS 5測試版中移除。 [更多信息](http://stackoverflow.com/questions/35246873/sharereplay-in-rxjs-5)。 –

+0

感謝您的回覆,但是我不敢擔心。我認爲問題在於線程流正在從該函數返回時訂閱,但bufferWithCount和掃描流未訂閱。正如我之前提到的,我不想在這裏訂閱,我只希望這在線程流訂閱時工作。 – Roaders

相關問題