2016-07-27 84 views
0

我想輪詢更改,並在達到所需值時Observable應完成(或等到超時)。現在我使用可以正常工作的過濾器,直到達到所需的值。但是我希望Observable在等待這個值時推送事件。RxJs直到達到期望值的可觀察間隔

例如,我等待狀態爲'成功',直到狀態變爲'成功',狀態'測試'從我的服務中返回。但由於過濾器正在等待'成功','測試'永遠不會返回。

我現在代碼:

return Observable 
    .interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .filter(data => this.finishedStatus(data.status)) 
    .take(1) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout'))); 

回答

0

你可能想takeWhile,而不是filter

return Observable 
    .interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .takeWhile(data => this.finishedStatus(data.status)) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout')); 

注意上述通吃除了的最後一個事件,如果你想在最後一場比賽太你需要是有點麻煩。

const source = Observable.interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .share(); 

    source 
    .takeUntil(source.filter(data => this.finishedStatus(data.status))) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout')); 

在這種情況下,你正在服用的所有結果,直到另一個Observable發出,在這種情況下,其他Observable僅僅是過濾而只發射成功事件的source

JsBin:http://jsbin.com/sojosuhune/edit?html,js,console,output