2016-11-09 58 views
2

拿這個小腳本(在LINQPad設計,但應該到處運行):Observable.Create:的CancellationToken不過渡到IsCancellationRequested

void Main() 
{ 
    Task.Run(() => Worker()).Wait(); 
} 

async Task Worker() 
{ 
    if (SynchronizationContext.Current != null) 
     throw new InvalidOperationException("Don't want any synchronization!"); 

    BaseClass provider = new Implementation(); 
    Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync = 
     provider.CreateValues; 
    var observable = Observable.Create(subscribeAsync); 

    var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s 
    cancellation.Register(() => Console.WriteLine("token is cancelled now")); 
    await observable 
     .Do(ts => 
     { 
      Console.WriteLine("Elapsed: {0}; cancelled: {1}", 
       ts, 
       cancellation.IsCancellationRequested); 
      cancellation.ThrowIfCancellationRequested(); 
     }) 
     .ToTask(cancellation) 
     .ConfigureAwait(false); 
} 

abstract class BaseClass 
{ 
    // allow implementers to use async-await 
    public abstract Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation); 
} 

class Implementation : BaseClass 
{ 
    // creates Values for 10s; entirely CPU-bound: no way for async-await hence return Task.CompletedTask 
    public override Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation) 
    { 
     try 
     { 
      var sw = Stopwatch.StartNew(); 
      for (int i = 0; i < 10; i++) 
      { 
       for (int j = 0; j < 3; j++) 
       { 
        Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested); 
        Thread.Sleep(333); 
       } 

       if (cancellation.IsCancellationRequested) // !! never gets true !! 
        throw new ApplicationException("token is cancelled"); 

       observer.OnNext(sw.Elapsed); 
      } 

      return Task.CompletedTask; 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex); 
      throw; 
     } 
    } 
} 

的方法Implementation.CreateValues甫一保持運行完整的10秒而不是後停止5.5s。通過Observable.Create傳遞的CancellationToken甚至不會轉換到取消狀態(當然原始令牌)!

這是一個錯誤嗎?做錯事是我的錯嗎?

輸出是:

0/0 cancelled:False 
0/1 cancelled:False 
0/2 cancelled:False 
Elapsed: 00:00:01.0205951; cancelled: False 
1/0 cancelled:False 
1/1 cancelled:False 
1/2 cancelled:False 
Elapsed: 00:00:02.0253279; cancelled: False 
2/0 cancelled:False 
2/1 cancelled:False 
2/2 cancelled:False 
Elapsed: 00:00:03.0274035; cancelled: False 
3/0 cancelled:False 
3/1 cancelled:False 
3/2 cancelled:False 
Elapsed: 00:00:04.0294796; cancelled: False 
4/0 cancelled:False 
4/1 cancelled:False 
4/2 cancelled:False 
Elapsed: 00:00:05.0315332; cancelled: False 
5/0 cancelled:False 
5/1 cancelled:False 
token is cancelled now 
5/2 cancelled:False 
Elapsed: 00:00:06.0335601; cancelled: True 
6/0 cancelled:False 
6/1 cancelled:False 
6/2 cancelled:False 
Elapsed: 00:00:07.0436211; cancelled: True 
7/0 cancelled:False 
7/1 cancelled:False 
7/2 cancelled:False 
Elapsed: 00:00:08.0457921; cancelled: True 
8/0 cancelled:False 
8/1 cancelled:False 
8/2 cancelled:False 
Elapsed: 00:00:09.0477509; cancelled: True 
9/0 cancelled:False 
9/1 cancelled:False 
9/2 cancelled:False 
Elapsed: 00:00:10.0498751; cancelled: True 
[AggregateException] at Main/Task.Wait() 
+0

這似乎是Rx的強制使用。這個國際海事組織的指標是1)Task和Rx的混合,2)通過'IObserver '作爲參數傳遞(不返回IObservable '3)在IObservable上使用'ToTask'並忽略結果'TimeSpan' 。我建議你用TPL/Task或Rx來做到這一點,但選擇一個(或完全是其他)。 –

+0

@LeeCampbell我已經切換到TPL-Dataflow,因爲其他一切都是基於任務的。但是:1)Rx包含許多XxxAsync方法,並聲稱是任務2)的理想伴侶,即Observable.Create()(Rx!)所期望的方法簽名3)爲什麼我應該在最後一個值可觀察到的信號已經發出? – springy76

+0

我不認爲「Rx聲稱是任務的理想伴侶」。它可以與任務一起工作以避免繁重的集成,但主要是我看到使用其中一個或另一個的建議。根據我的經驗,'Observable.Create'簽名幾乎完全用於閉包/ lambda,因此'IObserver '的傳遞已內化到方法的範圍。我的第三點是,如果你不關心可觀察序列的任何值,那麼值得質疑它的用法。很高興聽到你來到TPL-Dataflow的結果:-) –

回答

4

取消令牌獲取傳遞給subscribeAsync功能由Observable.Create調用實例化,是不是你實例化取消標記。

作爲每Observable.Create過載總結:

創建從指定的撤銷 異步可觀察序列訂閱方法。傳遞給 異步Subscribe方法的CancellationToken綁定到返回的一次性訂閱 ,允許盡力而爲的取消。

總之,取消令牌將在您處理訂閱時取消,而不是在指定的延遲之後。

你應該能夠重構你的代碼如下,使其工作:

Observable.Create(observer => subscribeAsync(observer, cancellation)); 

希望它能幫助。

+0

我知道這是一個不同的CancellationToken,我真的想在觀察者停止觀察時立即中止。當我用'await Task.Delay(333);'(這會導致刪除'返回Task.CompletedTask;'並將'async'添加到方法聲明中)替換'Thread.Sleep(333);'時, 。 – springy76

+0

啊,我現在看到你正在使用'''.ToTask(取消)'''來處理可觀測數據。道歉。 值得注意的是,ToTask [重載引入了併發性](https://github.com/Reactive-Extensions/Rx.NET/issues/21)可能會導致您的問題。我寫了[一篇博客文章](http://ian.bebbs.co.uk/posts/RxVsTpl)。 – ibebbs

+0

嗯。在該方法的開始部分,即使只有一個'await Task.Yield();'也足以使其按預期工作。我應該用這個覆蓋哪些不調用任何異步方法嗎? – springy76

0

這是不是一個真正的問題的答案,但使用System.Reactive(用於張貼的評論太多代碼)的System.Threading.Tasks.Dataflow就地示例代碼的重寫:

這有幾個優點:

  1. 因爲observer參數現在是一個Task每個執行有事await的。
  2. 如果需要,以前在Do()(現在在ActionBlock)的處理代碼本身可以實現async。
  3. 如果需要集成緩衝。
  4. 它的解耦=與技術無關:我的接口是Func<TimeSpan, Task<bool>>,因此不依賴於Rx或TPL-Dataflow或其他內容。

新代碼:

void Main() 
{ 
    Task.Run(() => Worker()).Wait(); 
    Console.WriteLine("DONE"); 
} 

async Task Worker() 
{ 
    if (SynchronizationContext.Current != null) 
     throw new InvalidOperationException("Don't want any synchronization!"); 

    var cancellation = new CancellationTokenSource(55000).Token; // gets cancelled after 5.5s 
    cancellation.Register(() => Console.WriteLine("token is cancelled now")); 

    var flow = new ActionBlock<TimeSpan>(
     async ts => 
     { 
      Console.WriteLine("[START] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested); 
      await Task.Delay(2500).ConfigureAwait(false); // processing takes more time than items need to produce 
      Console.WriteLine("[STOP] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested); 
     }, 
     new ExecutionDataflowBlockOptions 
     { 
      BoundedCapacity = 2, // Buffer 1 item ahead 
      EnsureOrdered = true, 
      CancellationToken = cancellation, 
     }); 

    Func<TimeSpan, Task<bool>> observer = ts => flow.SendAsync(ts, cancellation); 

    BaseClass provider = new Implementation(); 
    await provider.CreateValues(observer, cancellation).ConfigureAwait(false); 
    Console.WriteLine("provider.CreateValues done"); 

    flow.Complete(); 
    await flow.Completion.ConfigureAwait(false); 
    Console.WriteLine("flow completed"); 
} 

abstract class BaseClass 
{ 
    // allow implementers to use async-await 
    public abstract Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation); 
} 

class Implementation : BaseClass 
{ 
    public override async Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation) 
    { 
     try 
     { 
      var sw = Stopwatch.StartNew(); 
      for (int i = 0; i < 10; i++) 
      { 
       for (int j = 0; j < 3; j++) 
       { 
        Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested); 
        Thread.Sleep(333); 
       } 

       if (cancellation.IsCancellationRequested) 
        throw new ApplicationException("token is cancelled"); 

       var value = sw.Elapsed; 
       var queued = await observer(value); // use of "observer" encorces async-await even if there is nothing else async 
       Console.WriteLine("[{0}] '{1}' @ {2}", queued ? "enqueued" : "skipped", value, sw.Elapsed); 

       if (!queued) 
        ; // Dispose item 
      } 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex); 
      throw; 
     } 
    } 
}