2012-04-18 78 views
1

假設我有一些類MyClass。在我的代碼一個部分,我想是這樣的:在另一個地方(文件/項目/時間)我可以獨立訂閱/發佈Reactive Extensions嗎?

Observable.Subscribe<MyClass>(myClass => DoSomething(myClass)); 

然後,我有這樣的事情:

Observable.Publish(instanceOfMyClass); 

這第二行觸發所有方法那些訂閱的確切類的類型。這是Reactive Extensions(v1或v2)支持的東西嗎?

將SynchronizationContext指定爲Subscribe調用的一部分將會很有用。在那裏指定是否應該用WeakReference保持方法也是很好的。 Publish方法應該能夠同步完成,或者給我一些我可以等待的東西。

回答

3

這不會很難創建。

你只需要一個內部Dictionary<Type, Object>,並用它通過Type存儲每個Subject<T>(作爲對象)。

然後,您可以只寫兩個SubscribePublish方法來處理內部字典。

實際上應該很簡單。


而不只是說這很簡單我以爲我會放棄它。

這裏是我的Rx的Pub/Sub類:

public static class RxPS 
{ 
    private static Dictionary<Type, object> _subjects 
     = new Dictionary<Type, object>(); 

    public static IDisposable Subscribe<T>(Action<T> observer) 
    { 
     lock(_subjects) 
     { 
      if (!_subjects.ContainsKey(typeof(T))) 
      { 
       _subjects.Add(typeof(T), new Subject<T>()); 
      } 
      return (_subjects[typeof(T)] as Subject<T>) 
       .Subscribe(observer); 
     } 
    } 

    public static void Publish<T>(T item) 
    { 
     lock(_subjects) 
     { 
      if (_subjects.ContainsKey(typeof(T))) 
      { 
       (_subjects[typeof(T)] as Subject<T>) 
        .OnNext(item); 
      } 
     } 
    } 
} 

,這是它是如何使用:

RxPS.Publish(1); 
var d = RxPS.Subscribe<int>(x => Console.WriteLine(x)); 
RxPS.Publish(2); 
d.Dispose(); 
RxPS.Publish(3); 

其結果是,該代碼會寫2僅控制檯。

享受!

+0

向我解釋「OnNext」?這是否會觸發多個用戶?如果我讓「d」超出範圍會發生什麼?當垃圾收集器「來回」或「不會」時它會處理嗎? – Brannon 2012-04-19 21:50:29

+0

調用'OnNext'給主體下一個自己的可觀察序列的值,這個值將被髮送給主體的任何和所有用戶。如果你讓'd'超出範圍,什麼都不會發生。 GC將收集該變量,但它永遠不會調用Dispose,因此它不會取消任何訂閱。調用'Dispose'總是程序員的責任。在Rx的情況下,如果您想提早取消訂閱,即在自然地調用「OnCompleted」或「OnError」之前,您只需調用Dispose。 – Enigmativity 2012-04-20 00:01:02

3

我相信你正在尋找像ReactiveUI的MessageBus class。這個類使用Rx來實現發佈/訂閱模型,基本上只需要一個Type of Type => IObservables。

+0

Broken link:https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/MessageBus.cs – 2013-10-08 19:59:09

+0

已修復!謝謝.. – 2013-10-08 23:53:58