2017-04-25 63 views
2

這個可觀察的輪詢每秒鐘調用getPromise()函數。在getPromise()函數返回3個promise後,它停止解析它們。如何檢測getPromise()函數沒有解決/拒絕過去的任何承諾,比如說2秒,然後調用onError處理函數。我試圖讓它與timeout運營商合作無濟於事。有任何想法嗎?如何讓可觀察的輪詢檢測到未完成的承諾

Rx.Observable.interval(1000) 
 
    .switchMap(() => Rx.Observable.fromPromise(getPromise())) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

回答

2

可以使用race操作,訂閱只發出第一觀察到。

你說你想在2個不活動狀態之後調用onError處理程序。這與使用switchMap相矛盾,當從回調中返回新的Observable時,它自動取消訂閱。所以你可能想用exhaustMap代替。此外,當您發出錯誤通知時,該鏈會取消訂閱,您將永遠不會收到任何其他值。這意味着您不應該將超時設置爲error或使用retry運算符自動重新訂閱(但這實際上取決於您要實現的內容)。

這是您更新的示例,該示例僅使用race()運算符。

Rx.Observable.interval(1000) 
 
    .switchMap(() => 
 
    Rx.Observable.race(
 
     Rx.Observable.fromPromise(getPromise()), 
 
     Rx.Observable.timer(0, 1000).mapTo(42) 
 
    ) 
 
) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

編輯:2秒不活動之後發送單個錯誤通知。

Rx.Observable.interval(1000) 
 
    .switchMap(() => Rx.Observable.fromPromise(getPromise())) 
 
    .timeout(2000) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>

有真的在5.3.0中的錯誤不能直接在timeout()運營商,但在調度異步操作。 https://github.com/ReactiveX/rxjs/pull/2580

沒有timeout()操作:

Rx.Observable.interval(1000) 
    .switchMap(() => 
    Rx.Observable.race(
     Rx.Observable.fromPromise(getPromise()), 
     Rx.Observable.timer(0, 2000).map(function(_) { 
     throw new Error('timeout'); 
     }) 
    ) 
) 
    .subscribe(onValue, onError); 
+0

嘿,這是一個很好的解決方案!在獲得第42名後,你會如何取消訂閱?我不需要其餘的人。 –

+0

這個可觀察到的應該在2s過去之後取消寫入,並且沒有發射任何值。 –

+0

@EugeneEpifanov查看我使用'timeout()'引發錯誤的更新。它實際上看起來像在'timeout()'運算符中的RxJS 5.3.0中存在一個錯誤。使用RxJS 5.2.0它的工作原理應該是我想的。最近對'timeout()'運算符進行了更改,但是https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#530-2017-04-03 – martin