2017-07-19 69 views
1

我在嘗試使用Reactive Extensions(Rx)來處理數據流。但是,每個元素的處理可能需要一些時間。爲了中斷處理,我使用了CancellationToken,它可以有效地停止訂閱。可觀察訂閱如何正常終止?

當取消請求時,我該如何優雅地完成當前工作並正確終止而不丟失任何數據?

實施例:

var cts = new CancellationTokenSource(); 
cts.Token.Register(() => Console.WriteLine("Token cancelled.")); 

var observable = Observable 
    .Interval(TimeSpan.FromMilliseconds(250)); 

observable 
    .Subscribe(
     value => 
      { 
       Console.WriteLine(value); 
       Thread.Sleep(500); 

       if (cts.Token.IsCancellationRequested) 
       { 
        Console.WriteLine("Cancellation detected on {0}.", value); 
        Thread.Sleep(500); 
        Console.WriteLine("Cleaning up done for {0}.", value); 
       } 
      }, 
     () => Console.WriteLine("Completed"), 
     cts.Token); 

Console.ReadLine(); 
cts.Cancel(); 
Console.WriteLine("Job terminated."); 

輸出:

0 
1 
2 
Token cancelled. 
Job terminated. 
Cancellation detected on 2. 
Cleaning up done for 2. 

如可以從輸出中可以看出,線「作業結束」是不是最後一行,這意味着清除不會有有足夠的時間在應用程序終止之前完成。

+0

你想輸出什麼? – Shlomo

+0

在此示例中,應用程序線程在訂閱之前退出。我無法找到任何方式來等待訂閱的OnNext處理程序完成其工作。我不希望應用程序在處理程序完成之前退出,因此「作業終止」應該是最後一次打印。 – Reyhn

回答

2

使用similar question的答案和a question about waiting before terminating的答案,我找到了一個我想要的解決方案。

我最初的問題是,我發現沒有辦法等待訂閱的線程。上面鏈接的答案導致我通過三種方式重構代碼:

  1. 我將取消邏輯從訂閱移到observable。

  2. 該訂閱包裝在自己的Task(所以執行可以繼續到ReadLine -statement)。

  3. A ManualResetEvent被引入來控制應用程序退出策略。

解決方案:

var reset = new ManualResetEvent(false); 

var cts = new CancellationTokenSource(); 
cts.Token.Register(() => Console.WriteLine("Token cancelled.")); 

var observable = Observable 
    .Interval(TimeSpan.FromMilliseconds(250)) 
    .TakeWhile(x => !cts.Token.IsCancellationRequested) 
    .Finally(
     () => 
      { 
       Console.WriteLine("Finally: Beginning finalization."); 
       Thread.Sleep(500); 
       Console.WriteLine("Finally: Done with finalization."); 
       reset.Set(); 
      }); 

await Task.Factory.StartNew(
    () => observable 
     .Subscribe(
      value => 
       { 
        Console.WriteLine("Begin: {0}", value); 
        Thread.Sleep(2000); 
        Console.WriteLine("End: {0}", value); 
       }, 
      () => Console.WriteLine("Completed: Subscription completed.")), 
    TaskCreationOptions.LongRunning); 

Console.ReadLine(); 
cts.Cancel(); 
reset.WaitOne(); 
Console.WriteLine("Job terminated."); 

輸出:

Begin: 0 
End: 0 
Begin: 1 
Token cancelled. 
End: 1 
Completed: Subscription completed. 
Finally: Beginning finalization. 
Finally: Done with finalization. 
Job terminated. 

是相當新的,以無擴展,我不知道這是否是我的問題的最佳解決方案。但是對於問題中發佈的示例來說,這是一個很大的改進,因爲它滿足了我的要求:

  • 每個OnNext操作都允許運行完成。
  • 應用程序等待流處理完成(由ManualResetEvent發送信號)。
  • 將流消除邏輯移至TakeWhile-方法中的生產者(而不是消費者)。
  • 應用程序終止邏輯是對生產者的Finally-方法中的流消除的反應。

這是一個更好的解決方案。

2

如果我正確地理解了這個問題,這不是一個Rx問題,這是一個'無論你在做什麼訂閱'問題。您的訂閱操作需要半秒鐘,可能需要半秒鐘才能完成清理,而您的工作終止需要幾秒鐘的時間。你希望在取消和終止之間擠入什麼?

我可以給你的最好建議是讓訂閱操作比呼叫Thread.Sleep更好地兌現取消標記。

+0

我希望有某種機制允許應用程序**等待訂閱的OnNext處理程序來完成其工作。 – Reyhn

+0

@Reyhn - 使用1WaitHandle'。 – Enigmativity

+0

@Reyhn - 使用1WaitHandle'。 – Enigmativity