我有一個應用程序運行與多個觀察員的可觀察間隔。間隔每0.5秒從Web服務器加載一些XML數據,然後觀察者在後臺線程上執行一些特定於應用程序的處理。一旦數據不再需要,訂閱和間隔可觀察得到處理,所以觀察者的OnNext/OnCompleted/OnError將不會再被調用。到現在爲止還挺好。等待接收觀察員完成,而不使用鎖
我的問題:在一些罕見的情況下,有可能在調用Dispose後,我的觀察者的OnNext方法仍在運行!在處理後繼續進行操作之前,我想確保OnNext已經完成。
我目前的解決方案:我在觀察者類中引入了一個更衣室字段(參見代碼)。處置後,我嘗試獲取一個鎖,並且只有在鎖已被獲取後才繼續。雖然這個解決方案有效(?),但它對我而言只是感覺不對。
問:有沒有更優雅,更「Rx方式」來解決這個問題?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
我認爲,你是不是在Rx的精神開展工作,如果執行的是在你的OnNext長期封鎖工作回電話。由於Rx實際上是一個回調管道,您將阻止您的源代碼生產者。您可能需要考慮消息傳遞設計,或者考慮讓OnNext處理程序引入另一個異步層(請參閱嵌套可觀察序列) – 2015-01-29 15:23:01
此外,我建議您不要實現IObserver(或者IObservable )接口。相反,用操作員創建查詢。 –
2015-01-29 15:24:56