2012-11-26 78 views
1

我在C#中使用.NET的RX庫。任何人都可以向我解釋爲什麼'observer.OnCompleted()方法什麼也不做下面的代碼:RX for .NET - 從Disposable.Create中調用observer.OnCompleted時,什麼也沒有發生

var observableStream = Observable.Create<CustomMessage>(
     (observer) => 
      { 
       CustomMessage cm = new CustomMessage(); 
       CustomMessage.Subscribe(observer.OnNext); 

       return Disposable.Create(
       () => 
        { 
         Console.WriteLine("Disposing..."); 
         CustomMessage.Unsubscribe(observer.OnNext);       
         observer.OnCompleted();    //***Nothing happens here*** 
        } 
       ); 
     }); 

    //IObserver.OnException() 
    public override void OnException(Exception e) 
    { 
     Console.WriteLine("Exception occurred - " + e.Message); 
    } 

    //IObserver.OnComplete() 
    public override void OnUnsubscribe() 
    { 
     Console.WriteLine("Unsubscribed...");    
    } 

    //IObserver.OnNext() 
    public override void HandleNextMsg(IRVMessage msg) 
    { 
     Console.WriteLine("Instance received a message"); 
    } 

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe); 

//At some later point.... 
myDisposable.Dispose(); 

代碼的目的是使訂閱CustomMessages流。它在設置訂閱時使用我的CustomMessage類型註冊observer.OnNext()方法。然後它在註銷訂閱時取消註冊observer.OnNext()。所有這些都能正常工作。每當收到CustomMessage時,我的'HandleNextMsg()'方法都會被調用。

在稍後一點,當我想結束我的訂閱我稱之爲「的Dispose()」和下面兩行執行成功:

Console.WriteLine("Disposing..."); 
CustomMessage.Unsubscribe(observer.OnNext); 

然後我收到沒有更多CustomMessages。但是下面一行,雖然執行,不執行任何操作:

observer.OnCompleted(); 

我希望它來調用行:

Console.WriteLine("Unsubscribed..."); 

在某一點上的觀察者和「OnUnsubscribe」方法之間的連接丟失,我想了解到底發生了什麼。 'observer.OnNext()'如何被成功註銷,但'observer.OnCompleted()'什麼也不做?

有人向我指出,僅僅因爲我處理流並不意味着我應該調用'OnCompleted()',但我仍然想明白爲什麼它不起作用。

回答

2

您看到的問題是由於您通過的功能Observable.Create和您訂閱IObservable所產生的任何觀察者而造成的。基本上,流程如下:

  • Observable.Create返回AnonymousObservable。
  • AnonymousObservable將觀察者封裝在Subscribe中的AutoDetachObserver中。
  • AnonymousObservable從Subscribe中返回AutoDetachObserver(實現IDisposable)。
  • AutoDetachObserver.Dispose設置其停止標誌和然後配置對象您原來的訂閱功能返回。此標誌導致觀察者忽略對OnError和OnCompleted的未來調用,從而導致不會調用包裝的觀察者方法。

這個答案是基於RX的v1.x,但我預計這在v2.0中並沒有改變。

如果您有一些代碼需要運行,無論訂閱結束如何(OnError,OnCompleted或Dispose),我會建議Observable.Finally

+0

非常好的解釋謝謝! – JMc

+0

也適用於Rx Java! – pommedeterresautee

3

OnCompleted()是爲了通知訂閱者上游序列(您的案例中的CustomMessage)已經結束。這並不意味着訂戶要求取消訂閱成功的確認,這似乎是您如何使用它。 OnCompleted()是關於序列的通知,對於所有訂閱者,不是針對該序列的單個訂閱。

換句話說,您不應該在Dispose中調用它。畢竟,用戶正在自行解散,爲什麼需要通知?

至於沒有任何事情發生的實際技術原因,我猜測回調(通過設計)不會在您已經處置時進行。只是一個理論,這不是很相關。

+1

很好的解釋謝謝。正如我所提到的,我後來猜測,調用OnCompleted是不正確的,但你已經給出了一個很好的解釋,爲什麼這是這種情況,謝謝。 – JMc

相關問題