我在C#中使用.NET的RX庫。任何人都可以向我解釋爲什麼'observer.OnCompleted()方法什麼也不做下面的代碼:RX for .NET - 從Disposable.Create中調用observer.OnCompleted時,什麼也沒有發生
var observableStream = Observable.Create<CustomMessage>(
(observer) =>
{
CustomMessage cm = new CustomMessage();
CustomMessage.Subscribe(observer.OnNext);
return Disposable.Create(
() =>
{
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
observer.OnCompleted(); //***Nothing happens here***
}
);
});
//IObserver.OnException()
public override void OnException(Exception e)
{
Console.WriteLine("Exception occurred - " + e.Message);
}
//IObserver.OnComplete()
public override void OnUnsubscribe()
{
Console.WriteLine("Unsubscribed...");
}
//IObserver.OnNext()
public override void HandleNextMsg(IRVMessage msg)
{
Console.WriteLine("Instance received a message");
}
IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);
//At some later point....
myDisposable.Dispose();
代碼的目的是使訂閱CustomMessages流。它在設置訂閱時使用我的CustomMessage類型註冊observer.OnNext()方法。然後它在註銷訂閱時取消註冊observer.OnNext()。所有這些都能正常工作。每當收到CustomMessage時,我的'HandleNextMsg()'方法都會被調用。
在稍後一點,當我想結束我的訂閱我稱之爲「的Dispose()」和下面兩行執行成功:
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
然後我收到沒有更多CustomMessages。但是下面一行,雖然執行,不執行任何操作:
observer.OnCompleted();
我希望它來調用行:
Console.WriteLine("Unsubscribed...");
在某一點上的觀察者和「OnUnsubscribe」方法之間的連接丟失,我想了解到底發生了什麼。 'observer.OnNext()'如何被成功註銷,但'observer.OnCompleted()'什麼也不做?
有人向我指出,僅僅因爲我處理流並不意味着我應該調用'OnCompleted()',但我仍然想明白爲什麼它不起作用。
非常好的解釋謝謝! – JMc
也適用於Rx Java! – pommedeterresautee