2
我有一個事件流,我想調用一個函數來返回每個事件的承諾,問題是這個函數非常昂貴,所以我想一次最多處理n個事件。如何一次處理RxJS流n項目,一旦項目完成後,又自動填充回n?
這卵石圖可能是錯的,但是這是我想什麼:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
這是代碼,我到目前爲止有:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
有此代碼的兩個問題:
- 它使用了
inProgressCount
變量,它是用邊 更新效果函數秒。 完成$訂閱僅在請求超過1個受控流的項目時調用一次。這使得inProgressCount
var更新不正確,這最終將隊列限制爲一次一個。
你可以看到它在這裏工作: http://jsbin.com/wivehonifi/1/edit?js,console,output
問題:
- 有沒有更好的方法嗎?
- 我該如何擺脫
inProgressCount
變量? 爲什麼完成$訂閱只需要在請求多個項目時調用一次?
更新:
回答問題#3:switchMap是一樣的flatMapLatest,所以這就是爲什麼我只獲得了最後一個。將代碼更新爲flatMap而不是switchMap。
在RxJs 5您必須.mergeMap(映射器(resultSelectorFunction更換.flatMapWithMaxConcurrent | NULL),併發)[mergeMap](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap) – user3717718