2017-06-21 73 views
2

比方說,我有一個sequence這樣的:RxJS - 重試或重置

Rx.Observable 
.interval(1000) 
.subscribe(data => {console.log(data)}) 

隨着operators,我怎麼能 'restart' 的順序,意味着unsubscriberesubscribe

真正的情況是,sequence是一個套接字流,當我們需要unsubscriberesubscribe,有點像retryWhen(errors)作品,但不能有錯誤一定的條件......在理想情況下是這樣的... retryWhen(bool:Subject)

回答

2

我會這樣做使用switchMap(),因爲它會自動退訂舊的Observable並訂閱新的。在這種情況下我們只用.switchMap(() => source)

const subject = new Subject(); 

const source = Observable.create(obs => { 
    console.log('Observable.create'); 
    obs.next(42); 
}); 

subject.switchMap(() => source) 
    .subscribe(v => console.log('next:', v)); 


setTimeout(() => subject.next(), 1000); 
setTimeout(() => subject.next(), 5000); 

此打印如下:

Observable.create 
next: 42 
Observable.create 
next: 42 

只是代替source你有你的WebSocket源(或任何你有)。

+0

感謝這個解決方案,但是如何在沒有主題的情況下做到這一點?那可能嗎? – Thibs