2017-08-24 77 views
1

摘要:我正在使用Rxjs和新手。我想實現這樣一個可觀察的情景,但迄今爲止還沒有運氣。rxjs緩衝區,直到超時重新設置新的arg到達

有一個函數loadDetailsFromServer(itemIds),它調用服務器API並傳遞一些項目ID。這個功能被稱爲零星的。爲了優化服務器調用,下面是我想要做的事情: 隨着第一個函數調用的到來,超時被觸發。如果超時之前有任何新的函數調用到達,timout將被重置爲重新啓動。當超時啓動時,進行服務器調用,並且參數計數重置爲零。

這裏是一個大理石ISH圖:

Timer is 4 clicks. 
INPUTS IN TIME  1-2---3-4-----5--------6-7-------- 
loadDetailsFromServer [1,2,3,4] -  [5]   -[6,7] 

function called with [1,2,3,4] because no more calls after 4 clicks. 

提示:這是類似的搜索框樣本,並從服務器得到的結果,除了中間值感興趣,而不是忽略/跳過。

回答

0

例如,如果你有源可觀察到這樣的:

const Rx = require('rxjs/Rx'); 
const Observable = Rx.Observable; 

const TIMEOUT = 1000; 

const source = Observable.range(1, 20) 
    .concatMap(v => Observable.of(v).delay(Math.random() * 2000)); 

然後你就可以使用scan緩衝它的價值。重置緩衝區我正在使用.merge(bufferNotifier.mapTo(null))。然後switchMap()我總是等待012ms發射1000ms。如果沒有它的另一個觀察的「覆蓋」,因爲新到達緩衝器:

const bufferNotifier = new Subject(); 

const chain = source 
    .do(undefined, undefined,() => bufferNotifier.complete()) // properly complete the chain 
    .merge(bufferNotifier.mapTo(null)) // reset buffer Subject 
    .scan((acc, val) => { 
     if (val === null) { 
      return []; 
     } 
     acc.push(val); 
     return acc; 
    }, []) 
    .filter(arr => arr.length > 0) 
    .switchMap(buffer => { // wait 1s until emitting the buffer further 
     return Observable.forkJoin(
      Observable.of(buffer), 
      Observable.timer(1000).take(1), 
      arr => arr 
     ); 
    }) 
    .do(() => bufferNotifier.next()) // trigger reset the buffer 
    .subscribe(console.log); 

此輸出例如:

[ 1 ] 
[ 2 ] 
[ 3, 4 ] 
[ 5 ] 
[ 6, 7 ] 
[ 8, 9, 10, 11, 12 ] 
[ 13 ] 
[ 14, 15 ] 
[ 16 ] 
[ 17 ] 
[ 18 ] 
[ 19, 20 ] 
0

如果你也有類似的source可觀察到馬丁的回答,這樣的事情可以工作:

source 
    .buffer(source.debounceTime(250)) 
    .subscribe(console.log); 

buffer收集所有發出的值,直到給定的觀察到的發射。在這種情況下,它會等到debounceTime發出。 CodePen:https://codepen.io/anon/pen/PKBaZm?editors=1010