2011-05-19 49 views
11

如果發生錯誤,是否有方法讓可觀察序列繼續執行,以及序列中的下一個元素? 從this post看起來您需要在Catch()中指定一個新的可觀察序列以恢復執行,但是如果您需要繼續使用序列中的下一個元素繼續處理,那該怎麼辦?有沒有辦法做到這一點?使用Rx處理可觀察序列中的錯誤

更新: 該場景如下: 我有一堆我需要處理的元素。處理由一堆步驟組成。我有 將這些步驟分解成我想編寫的任務。 我遵循ToObservable()的指導原則here 將任務轉換爲構成的可觀察對象。 所以基本上我在做somethng像這樣 -

foreach(element in collection) 
{ 
    var result = from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 
    result.subscribe(register on next and error handlers here) 
} 

我也可以是這樣的:

var result = 
     from element in collection.ToObservable() 
     from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 

什麼是這裏繼續處理其他的元素,即使我們說的處理的最佳方式 其中一個元素引發異常。我希望能夠記錄錯誤並理想地繼續前進。

回答

1

IObservableIObserver之間的合同是OnNext*(OnCompelted|OnError)?,即使不是來源,所有運營商都支持該合同。

你唯一的選擇就是重新訂閱使用Retry源,但如果源返回IObservable實例每一個描述你將不會看到任何新的價值。

您能否提供有關您的方案的更多信息?也許有另一種方式來看待它。

編輯:基於更新後的反饋,這聽起來像你只需要Catch

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>()) 
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>()) 
    select cResult; 

這將替換與Empty一個錯誤,就不會觸發下一個序列(因爲它使用SelectMany

+0

我已更新該帖子以包含我正在嘗試完成的場景 – 2011-05-20 21:08:25

11

James James &理查德提出了一些很好的觀點,但我認爲他們沒有給你最好的方法來解決你的問題

James建議使用.Catch(Observable.Never<Unit>())。當他說「將...允許流繼續」時,他錯了,因爲一旦你遇到異常,流必須結束 - 這是理查德在提到觀察者和觀察者之間的合同時指出的。

此外,以這種方式使用Never將導致您的觀測值永遠不會完成。

簡而言之,.Catch(Observable.Empty<Unit>())是將序列從一個以錯誤結尾的序列更改爲以完成結束的序列的正確方法。

您已經達到了使用SelectMany來處理源集合的每個值的正確想法,以便您可以捕獲每個異常,但是您留下了一些問題。

您正在使用任務(TPL)只是將函數調用轉換爲可觀察值。這會強制您的observable使用任務池線程,這意味着SelectMany語句可能會以非確定性順序生成值。

此外,您還隱藏實際調用來處理數據,使重構和維護變得更困難。

我認爲你最好創建一個允許跳過異常的擴展方法。那就是:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return 
     source 
      .Select(t => 
       Observable.Start(() => selector(t)).Catch(Observable.Empty<R>())) 
      .Merge(); 
} 

用這種方法你現在可以簡單地這樣做:

var result = 
    collection.ToObservable() 
     .SelectAndSkipOnException(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }); 

這段代碼要簡單得多,但它隱藏了異常(S)。如果你想繼續讓異常繼續下去,那麼你需要做一些額外的功能。在Materialize擴展方法中增加一些重載可以保持錯誤。

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector); 
} 

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector) 
{ 
    Func<Notification<T>, Notification<R>> f = nt => 
    { 
     if (nt.Kind == NotificationKind.OnNext) 
     { 
      try 
      { 
       return Notification.CreateOnNext<R>(selector(nt.Value)); 
      } 
      catch (Exception ex) 
      { 
       ex.Data["Value"] = nt.Value; 
       ex.Data["Selector"] = selector; 
       return Notification.CreateOnError<R>(ex); 
      } 
     } 
     else 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       return Notification.CreateOnError<R>(nt.Exception); 
      } 
      else 
      { 
       return Notification.CreateOnCompleted<R>(); 
      } 
     } 
    }; 
    return source.Select(nt => f(nt)); 
} 

這些方法允許你這樣寫:

var result = 
    collection 
     .ToObservable() 
     .Materialize(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }) 
     .Do(nt => 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       /* Process the error in `nt.Exception` */ 
      } 
     }) 
     .Where(nt => nt.Kind != NotificationKind.OnError) 
     .Dematerialize(); 

你甚至可以鏈接這些Materialize方法和使用ex.Data["Value"] & ex.Data["Selector"]得到拋出錯誤出來的價值和選擇功能。

我希望這會有所幫助。

+0

我在嘗試使用observables觀察值時遇到類似問題。當一個內部observable拋出一個OnError時,外部可觀察的視圖會隨之移動到OnError - 從而導致一切關閉。我已經嘗試了捕獲異常並拋出OnCompleted的解決方案,但是這產生了與OnCompleted和OnError完全相同的行爲,導致訂閱關閉 – letstango 2014-04-15 23:18:05