2016-04-14 58 views
0

嘗試使用RX模擬系統發送來自多個發佈者的通知。合併RX中的多個自定義觀測值

我有兩個自定義接口ITopicObservable和ITopicObserver來模擬實現類除了IObservable和IObserver接口之外還有其他屬性和方法的事實。

我現在的問題是我的想法是我應該可以添加一些observables在一起,將它們合併在一起並訂閱觀察者以提供來自所有合併的observables的更新。但是,帶有「問題」註釋的代碼會拋出無效的轉換異常。

用例是一些獨立的傳感器,每個傳感器都會監測一個盒子中的溫度,例如將所有報告彙總到溫度健康狀況監測器隨後訂閱的一個溫度報告。

我在這裏錯過了什麼?還是有更好的方法來實現使用RX的情況?下面

using System; 
using System.Reactive.Linq; 
using System.Collections.Generic; 

namespace test 
{ 
class MainClass 
{ 
    public static void Main (string[] args) 
    { 
     Console.WriteLine ("Hello World!"); 
     var to = new TopicObserver(); 
     var s = new TopicObservable ("test"); 

     var agg = new AggregatedTopicObservable(); 
     agg.Add (s); 

     agg.Subscribe (to); 
    } 
} 

public interface ITopicObservable<TType>:IObservable<TType> 
{ 
    string Name{get;} 
} 

public class TopicObservable:ITopicObservable<int> 
{ 
    public TopicObservable(string name) 
    { 
     Name = name; 
    } 
    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     return null; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 

    #endregion 
} 

public class AggregatedTopicObservable:ITopicObservable<int> 
{ 
    List<TopicObservable> _topics; 
    private ITopicObservable<int> _observable; 
    private IDisposable _disposable; 

    public AggregatedTopicObservable() 
    { 
     _topics = new List<TopicObservable>(); 
    } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add ((TopicObservable)observable); 
    } 

    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     _observable = (ITopicObservable<int>)_topics.Merge(); 

     _disposable = _observable.Subscribe(observer); 

     return _disposable; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 
    #endregion 

} 



public interface ITopicObserver<TType>:IObserver<TType> 
{ 
    string Name{get;} 
} 

public class TopicObserver:ITopicObserver<int> 
{ 
    #region IObserver implementation 
    public void OnNext (int value) 
    { 
     Console.WriteLine ("next {0}", value); 
    } 
    public void OnError (Exception error) 
    { 
     Console.WriteLine ("error {0}", error.Message); 
    } 
    public void OnCompleted() 
    { 
     Console.WriteLine ("finished"); 
    } 
    #endregion 
    #region ITopicObserver implementation 
    public string Name { get;private set;} 
    #endregion 

} 
} 

回答

1

.Merge(...)操作您使用的簽名TibRv主題層的Rx的例子是:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources) 

這個.Merge()返回的實際類型是:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32] 

...所以它應該很清楚,調用(ITopicObservable<int>)_topics.Merge();會失敗。

李建議不要實施IObservable<>IObserver<>是正確的。它導致像上面那樣的錯誤。

如果你不得不做這樣的事情,我會做這種方式:

public interface ITopic 
{ 
    string Name { get; } 
} 

public interface ITopicObservable<TType> : ITopic, IObservable<TType> 
{ } 

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType> 
{ } 

public interface ITopicObserver<TType> : ITopic, IObserver<TType> 
{ } 

public class Topic 
{ 
    public string Name { get; private set; } 

    public Topic(string name) 
    { 
     this.Name = name; 
    } 
} 

public class TopicSubject : Topic, ITopicSubject<int> 
{ 
    private Subject<int> _subject = new Subject<int>(); 

    public TopicSubject(string name) 
     : base(name) 
    { } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _subject.Subscribe(observer); 
    } 

    public void OnNext(int value) 
    { 
     _subject.OnNext(value); 
    } 

    public void OnError(Exception error) 
    { 
     _subject.OnError(error); 
    } 

    public void OnCompleted() 
    { 
     _subject.OnCompleted(); 
    } 
} 

public class AggregatedTopicObservable : Topic, ITopicObservable<int> 
{ 
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>(); 

    public AggregatedTopicObservable(string name) 
     : base(name) 
    { } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add(observable); 
    } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _topics.Merge().Subscribe(observer); 
    } 
} 

public class TopicObserver : Topic, ITopicObserver<int> 
{ 
    private IObserver<int> _observer; 

    public TopicObserver(string name) 
     : base(name) 
    { 
     _observer = 
      Observer 
       .Create<int>(
        value => Console.WriteLine("next {0}", value), 
        error => Console.WriteLine("error {0}", error.Message), 
        () => Console.WriteLine("finished")); 
    } 

    public void OnNext(int value) 
    { 
     _observer.OnNext(value); 
    } 
    public void OnError(Exception error) 
    { 
     _observer.OnError(error); 
    } 
    public void OnCompleted() 
    { 
     _observer.OnCompleted(); 
    } 
} 

而且隨着運行:

var to = new TopicObserver("watching"); 
var ts1 = new TopicSubject("topic 1"); 
var ts2 = new TopicSubject("topic 2"); 

var agg = new AggregatedTopicObservable("agg"); 

agg.Add(ts1); 
agg.Add(ts2); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

其中給出:

 
next 42 
next 1 
finished 

但除了能夠給所有的名字(我不知道它是如何幫助的),你總是可以這樣做:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

var agg = new [] { ts1, ts2 }.Merge(); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

相同的輸出沒有接口和類。

還有更有趣的方法。試試這個:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var agg = new Subject<IObservable<int>>(); 

agg.Merge().Subscribe(to); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

agg.OnNext(ts1); 
agg.OnNext(ts2); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

var ts3 = new Subject<int>(); 

agg.OnNext(ts3); 

ts3.OnNext(99); 
ts3.OnCompleted(); 

這將產生:

 
next 42 
next 1 
next 99 

它可以讓你在合併後新添加源可觀!

+0

非常感謝!爲了回答關於接口使用的問題,我們例如有許多溫度探頭監測系統的不同部分。每個探針都有一個唯一的ID /名稱他們獨立記錄他們的溫度文件。將會有一個聚合溫度監控器有效地監控整個系統,這是AggregatedTopic進入的地方。還會有另一個實體(Observer)根據AggregatedTopic信息做出打開或關閉的決定。所有這些與日誌記錄。有沒有更好的方法來實現這一目標? – Bernard

+0

@Bernard - 是的,有一個更簡單的方法來做到這一點。您不應該擴展接口。相反,您應該創建一個包含所生成的id,名稱和值的自定義對象。然後你創建可觀察對象來返回這個自定義對象 - 你可以很簡單地進行合併。你真的應該問這是一個單獨的問題,我很樂意給你一個答案。如果你願意的話,請詳細告訴你如何知道溫度探頭什麼時候產生一個值(顯示代碼)。 – Enigmativity

+0

我已根據要求創建了一個新問題http://stackoverflow.com/questions/36723106/building-a-sensor-monitoring-system-using-rx。非常感謝您的幫助! – Bernard

2

我首先想到的

代碼,是你不應該執行IObservable<T>,你應該揭露它作爲屬性或方法的結果組成的。

第二個想法是Rx中有運營商擅長將多個序列合併/聚合在一起。 你應該喜歡使用這些。

第三,這類似於第一,你一般不會實現IObserver<T>,你只要訂閱可觀察序列,並提供代表每個回撥(OnNextOnErrorOnComplete

所以,你的代碼基本上降低到

Console.WriteLine("Hello World!"); 
var topic1 = TopicListener("test1"); 
var topic2 = TopicListener("test2"); 

topic1.Merge(topic2) 
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);}, 
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);}, 
    () => {Console.WriteLine("All topics have completed.");}); 

哪裏TopicListener(string)僅僅是返回IObservable<T>的方法。 TopicListener(string)方法的執行很可能使用Observable.Create

它可能有助於查看在基於主題的郵件系統上映射Rx的示例。 還有就是如何能夠在這裏https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq

+0

感謝您的支持!我很欣賞這個明確的答案,但我認爲@Enigmativity更具描述性。 – Bernard

+0

是的,他的回答非常詳細。儘管對這些「主題」感到厭倦! –

+0

Campell,缺點是什麼? – Bernard