拿這個小腳本(在LINQPad設計,但應該到處運行):Observable.Create:的CancellationToken不過渡到IsCancellationRequested
void Main()
{
Task.Run(() => Worker()).Wait();
}
async Task Worker()
{
if (SynchronizationContext.Current != null)
throw new InvalidOperationException("Don't want any synchronization!");
BaseClass provider = new Implementation();
Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync =
provider.CreateValues;
var observable = Observable.Create(subscribeAsync);
var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s
cancellation.Register(() => Console.WriteLine("token is cancelled now"));
await observable
.Do(ts =>
{
Console.WriteLine("Elapsed: {0}; cancelled: {1}",
ts,
cancellation.IsCancellationRequested);
cancellation.ThrowIfCancellationRequested();
})
.ToTask(cancellation)
.ConfigureAwait(false);
}
abstract class BaseClass
{
// allow implementers to use async-await
public abstract Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation);
}
class Implementation : BaseClass
{
// creates Values for 10s; entirely CPU-bound: no way for async-await hence return Task.CompletedTask
public override Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation)
{
try
{
var sw = Stopwatch.StartNew();
for (int i = 0; i < 10; i++)
{
for (int j = 0; j < 3; j++)
{
Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested);
Thread.Sleep(333);
}
if (cancellation.IsCancellationRequested) // !! never gets true !!
throw new ApplicationException("token is cancelled");
observer.OnNext(sw.Elapsed);
}
return Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
}
的方法Implementation.CreateValues
甫一保持運行完整的10秒而不是後停止5.5s。通過Observable.Create
傳遞的CancellationToken
甚至不會轉換到取消狀態(當然原始令牌)!
這是一個錯誤嗎?做錯事是我的錯嗎?
輸出是:
0/0 cancelled:False
0/1 cancelled:False
0/2 cancelled:False
Elapsed: 00:00:01.0205951; cancelled: False
1/0 cancelled:False
1/1 cancelled:False
1/2 cancelled:False
Elapsed: 00:00:02.0253279; cancelled: False
2/0 cancelled:False
2/1 cancelled:False
2/2 cancelled:False
Elapsed: 00:00:03.0274035; cancelled: False
3/0 cancelled:False
3/1 cancelled:False
3/2 cancelled:False
Elapsed: 00:00:04.0294796; cancelled: False
4/0 cancelled:False
4/1 cancelled:False
4/2 cancelled:False
Elapsed: 00:00:05.0315332; cancelled: False
5/0 cancelled:False
5/1 cancelled:False
token is cancelled now
5/2 cancelled:False
Elapsed: 00:00:06.0335601; cancelled: True
6/0 cancelled:False
6/1 cancelled:False
6/2 cancelled:False
Elapsed: 00:00:07.0436211; cancelled: True
7/0 cancelled:False
7/1 cancelled:False
7/2 cancelled:False
Elapsed: 00:00:08.0457921; cancelled: True
8/0 cancelled:False
8/1 cancelled:False
8/2 cancelled:False
Elapsed: 00:00:09.0477509; cancelled: True
9/0 cancelled:False
9/1 cancelled:False
9/2 cancelled:False
Elapsed: 00:00:10.0498751; cancelled: True
[AggregateException] at Main/Task.Wait()
這似乎是Rx的強制使用。這個國際海事組織的指標是1)Task和Rx的混合,2)通過'IObserver'作爲參數傳遞(不返回IObservable '3)在IObservable上使用'ToTask'並忽略結果'TimeSpan' 。我建議你用TPL/Task或Rx來做到這一點,但選擇一個(或完全是其他)。 –
@LeeCampbell我已經切換到TPL-Dataflow,因爲其他一切都是基於任務的。但是:1)Rx包含許多XxxAsync方法,並聲稱是任務2)的理想伴侶,即Observable.Create()(Rx!)所期望的方法簽名3)爲什麼我應該在最後一個值可觀察到的信號已經發出? – springy76
我不認爲「Rx聲稱是任務的理想伴侶」。它可以與任務一起工作以避免繁重的集成,但主要是我看到使用其中一個或另一個的建議。根據我的經驗,'Observable.Create'簽名幾乎完全用於閉包/ lambda,因此'IObserver'的傳遞已內化到方法的範圍。我的第三點是,如果你不關心可觀察序列的任何值,那麼值得質疑它的用法。很高興聽到你來到TPL-Dataflow的結果:-) –