2011-02-28 50 views
2

我有一個IEnumerable序列,它包含一些阻塞網絡操作(在下面的示例代碼中用一些簡單的yield代替)。我正在使用Reactive Extensions將通過網絡傳輸的數據流轉換爲可觀察的序列。跨線程編組異常(反應式擴展)

我正在尋找一種方法將異常封送到主線程,以便未處理的異常不會導致我的應用程序終止。我不能在IEnumerable線程上放置try/catch塊,因爲編譯器不允許try/catch語句中的yield return語句。

using System; 
using System.Collections.Generic; 
using System.Concurrency; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ConsoleApplication7 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      try 
      { 
       Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId); 
       var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations 

       // Use subject because we need many subscriptions to a single data source 
       var subject = new Subject<int>(); 

       subject.Subscribe(x => Console.WriteLine("Subscriber1: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId), 
        x => Console.WriteLine("Subscriber1 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId), 
        () => Console.WriteLine("Subscriber1 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId)); 
       subject.Subscribe(x => Console.WriteLine("Subscriber2: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId), 
        x => Console.WriteLine("Subscriber2 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId), 
        () => Console.WriteLine("Subscriber2 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId)); 

       Console.WriteLine("Press key to start receiving data"); 
       Console.ReadKey(); 
       var sub = observable.Subscribe(subject); 

       Console.WriteLine("Press key to exit"); 
       Console.ReadKey(); 
       sub.Dispose(); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("Caught exception on main thread"); 
      } 

     } 

     public static IEnumerable<int> TestEnumerable() 
     { 
      while (true) 
      { 
       yield return 1; 
       Thread.Sleep(200); 
       yield return 2; 
       Thread.Sleep(200); 
       yield return 3; 
       Thread.Sleep(200); 
       throw new InvalidOperationException(); 
      } 
     } 
    } 
} 
+0

是你的使用場景_actually_控制檯應用程序,還是使用UI的東西? – 2011-02-28 20:14:36

+0

此外,您是否考慮過使用'IConnectableObservable'(通過'Publish')而不是'Subject'來共享可觀察的? – 2011-02-28 21:11:20

+0

使用場景是一個控制檯應用程序。我沒有聽說過IConnectableObservable,我會做一些研究。感謝您的幫助......會給您的解決方案一個嘗試。 – Oenotria 2011-02-28 21:29:17

回答

3

解決方案取決於您是否有可用的Dispatcher/SynchronisationContext。在這種情況下,最好使用一個。

解決方案1:調度/ SynchronisationContext是

(即使用WPF,Windows窗體,或自定義調度循環。)

您可以使用ObserveOn + Catch到搬回錯誤到可用調度程序線程。我已經看到這在WPF應用程序中使用,它運行良好。

你如何將你的IScheduler/DispatcherScheduler周圍是由你(我們使用IOC)

public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    IScheduler scheduler) 
{ 
    return source.Catch<T,Exception>(ex => 
     Observable.Throw<T>(ex).ObserveOn(scheduler)); 
} 

// We didn't use it, but this overload could useful if the dispatcher is 
// known at the time of execution, since it's an optimised path 
public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    DispatcherScheduler scheduler) 
{ 
    return source.Catch<T,Exception>(ex => 
     Observable.Throw<T>(ex).ObserveOn(scheduler)); 
} 

解決方案2:沒有可用的調度

使用Console.ReadKey()的相反,使用ManualResetEvent和等待然後拋出可變的錯誤:

 static void Main(string[] args) 
     { 
      try 
      { 
       Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId); 
       var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations 

       // Use subject because we need many subscriptions to a single data source 
       var subject = new Subject<int>(); 

       Exception exception = null; 
       ManualResetEvent mre = new ManualResetEvent(false); 

       using(subject.Subscribe(
        x => Console.WriteLine(x), 
        ex => { exception = ex; mre.Set(); }, 
        () => Console.WriteLine("Subscriber2 Finished"))) 

       using(subject.Subscribe(
        x => Console.WriteLine(x), 
        ex => { exception = ex; mre.Set(); }, 
        () => Console.WriteLine("Subscriber2 Finished"))) 

       using (observable.Subscribe(subject)) 
       { 
        mre.WaitOne(); 
       } 

       if (exception != null) 
        throw exception; 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("Caught exception on main thread"); 
      } 

     }