可以使用race
操作,訂閱只發出第一觀察到。
你說你想在2個不活動狀態之後調用onError
處理程序。這與使用switchMap
相矛盾,當從回調中返回新的Observable時,它自動取消訂閱。所以你可能想用exhaustMap
代替。此外,當您發出錯誤通知時,該鏈會取消訂閱,您將永遠不會收到任何其他值。這意味着您不應該將超時設置爲error
或使用retry
運算符自動重新訂閱(但這實際上取決於您要實現的內容)。
這是您更新的示例,該示例僅使用race()
運算符。
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 1000).mapTo(42)
)
)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
編輯:2秒不活動之後發送單個錯誤通知。
Rx.Observable.interval(1000)
.switchMap(() => Rx.Observable.fromPromise(getPromise()))
.timeout(2000)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>
有真的在5.3.0中的錯誤不能直接在timeout()
運營商,但在調度異步操作。 https://github.com/ReactiveX/rxjs/pull/2580
沒有timeout()
操作:
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 2000).map(function(_) {
throw new Error('timeout');
})
)
)
.subscribe(onValue, onError);
嘿,這是一個很好的解決方案!在獲得第42名後,你會如何取消訂閱?我不需要其餘的人。 –
這個可觀察到的應該在2s過去之後取消寫入,並且沒有發射任何值。 –
@EugeneEpifanov查看我使用'timeout()'引發錯誤的更新。它實際上看起來像在'timeout()'運算符中的RxJS 5.3.0中存在一個錯誤。使用RxJS 5.2.0它的工作原理應該是我想的。最近對'timeout()'運算符進行了更改,但是https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#530-2017-04-03 – martin