2016-07-26 75 views
2

我有一個事件流,我想調用一個函數來返回每個事件的承諾,問題是這個函數非常昂貴,所以我想一次最多處理n個事件。如何一次處理RxJS流n項目,一旦項目完成後,又自動填充回n?

這卵石圖可能是錯的,但是這是我想什麼:

---x--x--xxxxxxx-------------x-------------> //Events 
---p--p--pppp------p-p-p-----p-------------> //In Progress 
-------d--d--------d-d-dd------dddd--------> //Promise Done 

---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES 

這是代碼,我到目前爲止有:

var n = 4; 
var inProgressCount = 0; 

var events$ = Rx.Observable.fromEvent(produceEvent, 'click') 
    .map((ev) => new Date().getTime()); 

var inProgress$ = events$.controlled(); 

var done$ = inProgress$  
    .tap(() => inProgressCount++) 
    .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp))); 

done$.subscribeOnNext((timestamp) => { 
    inProgressCount--; 
    inProgress$.request(Math.max(1, n - inProgressCount)); 
}); 

inProgress$.request(n); 

有此代碼的兩個問題:

  1. 它使用了inProgressCount變量,它是用邊 更新效果函數秒。
  2. 完成$訂閱僅在請求超過1個受控流的項目時調用一次。這使得inProgressCount var更新不正確,這最終將隊列限制爲一次一個。

你可以看到它在這裏工作: http://jsbin.com/wivehonifi/1/edit?js,console,output

問題:

  1. 有沒有更好的方法嗎?
  2. 我該如何擺脫inProgressCount變量?
  3. 爲什麼完成$訂閱只需要在請求多個項目時調用一次?

更新:
回答問題#3:switchMap是一樣的flatMapLatest,所以這就是爲什麼我只獲得了最後一個。將代碼更新爲flatMap而不是switchMap。

回答

3

實際上根本不需要使用背壓。有一個名爲flatMapWithMaxConcurrent的運營商爲您提供此服務。它本質上是調用.map().merge(concurrency)的別名,它一次只允許最大數量的流在運行。

我更新了jsbin這裏:http://jsbin.com/weheyuceke/1/edit?js,output

但我註釋以下重要的一點:

const concurrency = 4; 

var done$ = events$ 
    //Only allows a maximum number of items to be subscribed to at a time 
    .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) => 
     //This overload of `fromPromise` defers the execution of the lambda 
     //until subscription      
     Rx.Observable.fromPromise(() => { 
     //Notify the ui that this task is in progress         
     updatePanelAppend(inProgress, timestamp); 
     removeFromPanel(pending, timestamp); 
     //return the task 
     return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp) 
    })); 
+1

在RxJs 5您必須.mergeMap(映射器(resultSelectorFunction更換.flatMapWithMaxConcurrent | NULL),併發)[mergeMap](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap) – user3717718

相關問題