2014-12-27 79 views
3

我有一個應用程序運行與多個觀察員的可觀察間隔。間隔每0.5秒從Web服務器加載一些XML數據,然後觀察者在後臺線程上執行一些特定於應用程序的處理。一旦數據不再需要,訂閱和間隔可觀察得到處理,所以觀察者的OnNext/OnCompleted/OnError將不會再被調用。到現在爲止還挺好。等待接收觀察員完成,而不使用鎖

我的問題:在一些罕見的情況下,有可能在調用Dispose後,我的觀察者的OnNext方法仍在運行!在處理後繼續進行操作之前,我想確保OnNext已經完成。

我目前的解決方案:我在觀察者類中引入了一個更衣室字段(參見代碼)。處置後,我嘗試獲取一個鎖,並且只有在鎖已被獲取後才繼續。雖然這個解決方案有效(?),但它對我而言只是感覺不對。

問:有沒有更優雅,更「Rx方式」來解決這個問題?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace RxExperimental 
{ 
    internal sealed class MyXmlDataFromWeb 
    { 
     public string SomeXmlDataFromWeb { get; set; } 
    } 

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb> 
    { 
     private readonly object _locker = new object(); 
     private readonly string _observerName; 

     public MyObserver(string observerName) { 
      this._observerName = observerName; 
     } 

     public object Locker { 
      get { return this._locker; } 
     } 

     public void OnCompleted() { 
      lock (this._locker) { 
       Console.WriteLine("{0}: Completed.", this._observerName); 
      } 
     } 

     public void OnError(Exception error) { 
      lock (this._locker) { 
       Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message); 
      } 
     } 

     public void OnNext(MyXmlDataFromWeb value) { 
      lock (this._locker) { 
       Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId); 
       Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb); 
       Thread.Sleep(5000); // simulate some long running operation 
       Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId); 
      } 
     } 
    } 

    internal sealed class Program 
    { 
     private static void Main() { 
      const int interval = 500; 
      // 
      var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => { 
       var data = new MyXmlDataFromWeb { 
        SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now) 
       }; 
       return data; 
      }).Publish(); 
      // 
      var observer1 = new MyObserver("Observer 1"); 
      var observer2 = new MyObserver("Observer 2"); 
      // 
      var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1); 
      var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2); 
      // 
      var connection = dataSource.Connect(); 
      // 
      Console.WriteLine("Press any key to cancel ..."); 
      Console.ReadLine(); 
      // 
      subscription1.Dispose(); 
      subscription2.Dispose(); 
      connection.Dispose(); 
      // 
      lock (observer1.Locker) { 
       Console.WriteLine("Observer 1 completed."); 
      } 
      lock (observer2.Locker) { 
       Console.WriteLine("Observer 2 completed."); 
      } 
      // 
      Console.WriteLine("Can only be executed, after all observers completed."); 
     } 
    } 
} 
+0

我認爲,你是不是在Rx的精神開展工作,如果執行的是在你的OnNext長期封鎖工作回電話。由於Rx實際上是一個回調管道,您將阻止您的源代碼生產者。您可能需要考慮消息傳遞設計,或者考慮讓OnNext處理程序引入另一個異步層(請參閱嵌套可觀察序列) – 2015-01-29 15:23:01

+0

此外,我建議您不要實現IObserver (或者IObservable )接口。相反,用操作員創建查詢。 – 2015-01-29 15:24:56

回答

3

是的,還有一個更接近Rx的方法。

第一個觀察結果是,從可觀察流中取消訂閱基本上與觀察者當前正在發生的內容無關。沒有任何反饋。既然你有要求,當觀察結束時你明確地知道,你需要將它建模成可觀察的流。換句話說,而不是從流中取消訂閱,您應該完成流,因此您將能夠觀察OnComplete事件。在你的情況下,你可以使用TakeUntil來結束observable而不是取消訂閱它。

第二個觀察結果是,您的主程序需要觀察您的「觀察者」何時完成其工作。但是既然你讓你的「觀察者」成爲一個實際的IObservable,你真的沒有辦法做到這一點。當人們第一次開始使用Rx時,這是我看到的一個常見的混淆來源。如果你將「觀察者」建模爲可觀察鏈中的另一個鏈接,那麼你的主程序可以是觀察者。具體而言,您的「觀察者」只不過是一種映射操作(帶有副作用),它將傳入的Xml數據映射爲「已完成」消息。

所以,如果你重構你的代碼,你可以得到你想要的東西......

public class MyObserver 
{ 
    private readonly string _name; 

    public MyObserver(string name) { _name = name; } 

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source) 
    { 
     return source.Select(value => 
     { 
      Thread.Sleep(5000); // simulate work 
      return Unit.Default; 
     }); 
    } 
} 

// main 
var endSignal = new Subject<Unit>(); 
var dataSource = Observable 
    .Interval(...) 
    .Select(...) 
    .TakeUntil(endSignal) 
    .Publish(); 
var observer1 = new MyObserver("Observer 1"); 
var observer2 = new MyObserver("Observer 2"); 
var results1 = observer1.Handle(dataSource.ObserveOn(...)); 
var results2 = observer2.Handle(dataSource.ObserveOn(...)); 
// since you just want to know when they are all done 
// just merge them. 
// use ToTask() to subscribe them and collect the results 
// as a Task 
var processingDone = results1.Merge(results2).Count().ToTask(); 

dataSource.Connect(); 

Console.WriteLine("Press any key to cancel ..."); 
Console.ReadLine(); 

// end the stream 
endSignal.OnNext(Unit.Default); 

// wait for the processing to complete. 
// use await, or Task.Result 
var numProcessed = await processingDone; 
+0

非常感謝您的意見。但是,這種方法留下了幾個問題沒有答案: 1.雖然在MyOberver的Handle方法中的實際工作可以取代onNext處理程序,但該方法應該如何處理onError和onCompleted事件?那麼如何確保onError/onCompleted完成的工作在取消間隔之後繼續進行操作之前實際完成?當然,我可以添加一次性訂閱,但之後我遇到同樣的問題。 ;-) – dotNZ 2014-12-29 23:42:52

+0

2.在MyObserver的Handle方法中做實際的工作有一個缺點:processingDone.Wait()等待,直到序列中的所有消息都被處理完。這種行爲是不希望的!我希望它在取消間隔後立即忽略/處理所有未完成的消息(如果有的話)(僅等待當前正在運行的onNext/onError/onCompleted完成)。我最初的方法(鎖定)通過處理訂閱來實現這種行爲。 任何想法? – dotNZ 2014-12-29 23:43:45

+0

1 - 在'Select'後面'MyObserver'附加更多的操作符。根據您的具體要求有很多選擇。 [.Do()](http://msdn.microsoft.com/en-us/library/hh229830(v = vs.103).aspx)是一個開始尋找的好地方。 Rx將保證在'processingDone'任務完成之前操作完成。 – Brandon 2014-12-30 00:09:58