2010-02-13 47 views
3

我正在努力與我的第一個簡單的「hello world」RX應用程序。我正在使用VS2010 RC,以及最新的RX下載。第一步與反應式擴展蹣跚步驟

以下是簡單的控制檯應用程序;

class Program 
    { 
     static void Main(string[] args) 
     { 

      var channel = new MessageChannel() 
       .Where(m => m.process) 
       .Subscribe((MyMessage m) => Console.WriteLine(m.subject)); 

      //channel.GenerateMsgs(); 
     } 
    } 

    public class MyMessage 
    { 
     public string subject; 
     public bool process; 
    } 

    public class MessageChannel: IObservable<MyMessage> 
    { 
     List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>(); 

     public IDisposable Subscribe(IObserver<MyMessage> observer) 
     { 
      observers.Add(observer); 
      return observer as IDisposable; 
     } 

     public void GenerateMsgs() 
     { 
      foreach (IObserver<MyMessage> observer in observers) 
      { 
       observer.OnNext(new MyMessage() {subject = "Hello!", process = true}); 
      } 
     } 
    } 

我在Where子句中得到一個ArgumentNullException。這是堆棧;

System.ArgumentNullException was unhandled 
    Message=Value cannot be null. 
Parameter name: disposable 
    Source=System.Reactive 
    ParamName=disposable 
    StackTrace: 
     at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable) 
     at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() 
     at System.Threading.Scheduler.NowScheduler.Schedule(Action action) 
     at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer) 
     at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18 
     at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args) 
     at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) 
     at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly() 
     at System.Threading.ThreadHelper.ThreadStart_Context(Object state) 
     at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx) 
     at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) 
     at System.Threading.ThreadHelper.ThreadStart() 
    InnerException: 

回答

1

此行似乎引起大驚小怪:

return observer as IDisposable; 

你不應該承擔的觀察者是一次性的,你應該回到它知道「退訂」的一次性對象。

該方法返回對IDSposable接口的引用。這使得 觀察者能夠在提供者完成之前取消訂閱(即, 停止接收通知) 發送它們並且呼叫 訂戶的OnCompleted方法。

你可以把它用做類似工作:

public class MessageChannel: IObservable<MyMessage> 
{ 
    class Subscription : IDisposable { 
     MessageChannel _c; 
     IObservable<MyMessage> _obs; 
     public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
      _c = c; _obs = obs; 
     } 
     public void Dispose() { 
      _c.Unsubscribe(_obs); 
     } 
    } 

    public IDisposable Subscribe(IObserver<MyMessage> observer) 
    { 
     observers.Add(observer); 
     return new Subscription(this, observer); 
    } 

    void Unsubscribe(IObservable<MyMessage> obs) { 
     observers.Remove(obs); 
    } 
} 
+0

非常感謝弗蘭克,那打在頭上。只需要改變Subcribe類來使用IObserver而不是IObservable(我不抱怨!)。我使用了IObserverable提供的MSDN示例(http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx) – 2010-02-13 19:33:33

+0

您也可以使用'Disposable.Create(()=> Unsubscribe(觀察者)) - 'System.Reactive.Disposables'命名空間有許多有用的IDisposable實現 – AlexFoxGill 2013-03-28 09:19:12

1

!!紅旗!

我強烈建議您自己不要執行IObserver<T>IObservable<T>。贊成使用Observable.Create<T>或作爲最後的手段使用Subject類型。爲了正確實現這些接口,需要考慮很多事情,這些接口由正確的Rx類型和運算符爲您處理。

在這個例子中,我會勸你放棄的MessageChannel類型和交換它

class Program 
{ 
    static void Main(string[] args) 
    { 
     var channel = GenerateMsgs() 
      .Where(m => m.process) 
      .Subscribe((MyMessage m) => Console.WriteLine(m.subject)); 
    } 

    public IObservable<MyMessage> GenerateMsgs() 
    { 
     return Observable.Create<MyMessage>(observer=> 
     { 
      observer.OnNext(new MyMessage() {subject = "Hello!", process = true}); 
     }); 
    } 
} 

public class MyMessage 
{ 
    public string subject; 
    public bool process; 
} 

在系統設計的進一步檢查,你可能有某種服務的一個公開的「通道」作爲觀察序列。

public interface OrderService 
{ 
    IObservable<OrderRequest> OrderRequests(); 
    IObservable<Order> ProcessedOrders(); 
    IObservable<OrderRejection> OrdersRejections(); 
} 

從而否定爲IObserver<T>IObservable<T>這些自定義實現的需要。