2017-03-09 102 views
1

我試圖通過observable對一組內容進行流式處理,並在第一個錯誤後停止。將它看作一系列項目是簡單的,因爲它的行爲方式相同。rxjs在錯誤後立即完成了observable而不是繼續

  1. 我創建可觀察到的從項
  2. 的陣列的每個項目映射到URL
  3. 調用URL作爲請求承諾
  4. 執行一個catch()返回一個observable.empty ()中的錯誤

使用RxJS 5的事件:

rx.Observable.from(array) 
    .map(self.createUrl) 
    .flatMap(x => { 
      var options = { 
      uri: url, 
      headers: { 
       "Content-Type": "application/json" 
      }; 
      return rx.Observable.fromPromise(request-promise(options)); 
     }) 
    .catch(() => { 
    return rx.Observable.empty();}) 
    .subscribe(x => console.log('success:', x), 
      e => console.log('error'), 
      () => console.log('complete')); 

執行此序列時,代碼在遇到第一個錯誤後停止。我懷疑#4中的空觀察者正在終止觀測值,但我不知道爲什麼。

我期望的過程是無論錯誤如何處理數組中的所有項目 - 最終處理所有成功項目並在每個錯誤後恢復。

+2

這種行爲是有道理的。如果發生錯誤,原始觀測值將完成並被替換爲空值。我認爲你正在尋找['onErrorResumeNext()'](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/onerrorresumenext.md)。使用它,例如,結合'flatMap' – slezica

+0

我試圖通過包裝.from(數組)行: rx.Observable.onErrorResumeNext(rx.Observable.from(array)), 這樣做什麼都沒有。 我不知道如何將它從flatMap包裝到請求?或者我錯過了什麼? –

+0

對不起 - pebkac。將可觀察*包裹在裏面*平面地圖就是你說的。謝謝。它看起來像我還沒有解決答覆。 –

回答

3

你只需把catch()flatMap內:

rx.Observable.from(array) 
    .map(self.createUrl) 
    .flatMap(x => { 
    var options = { 
     uri: url, 
     headers: { 
     "Content-Type": "application/json" 
     } 
    }; 
    return rx.Observable 
     .fromPromise(request-promise(options)) 
     .catch(() => rx.Observable.empty()); 
    }) 
    .subscribe(x => console.log('success:', x), 
      e => console.log('error'), 
      () => console.log('complete')); 

現在,當再內可觀測發出錯誤,它會立即捕獲並通過.flatMap()不會傳播到主流。