我在嘗試使用Reactive Extensions(Rx)來處理數據流。但是,每個元素的處理可能需要一些時間。爲了中斷處理,我使用了CancellationToken
,它可以有效地停止訂閱。可觀察訂閱如何正常終止?
當取消請求時,我該如何優雅地完成當前工作並正確終止而不丟失任何數據?
實施例:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
輸出:
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
如可以從輸出中可以看出,線「作業結束」是不是最後一行,這意味着清除不會有有足夠的時間在應用程序終止之前完成。
你想輸出什麼? – Shlomo
在此示例中,應用程序線程在訂閱之前退出。我無法找到任何方式來等待訂閱的OnNext處理程序完成其工作。我不希望應用程序在處理程序完成之前退出,因此「作業終止」應該是最後一次打印。 – Reyhn