4
我使用Rx中的BufferWithTime()來批量處理消息。如果我的OnNext方法比使用的時間間隔花費更長的時間,可能最終會有兩個OnNext方法的併發調用?來自Reactive Extensions的BufferWithTime會重疊地調用OnNext嗎?
換言之,BufferWithTime()調用中指定的時間間隔是以絕對值計算還是在我的OnNext方法返回後用作「異步睡眠」?
我使用Rx中的BufferWithTime()來批量處理消息。如果我的OnNext方法比使用的時間間隔花費更長的時間,可能最終會有兩個OnNext方法的併發調用?來自Reactive Extensions的BufferWithTime會重疊地調用OnNext嗎?
換言之,BufferWithTime()調用中指定的時間間隔是以絕對值計算還是在我的OnNext方法返回後用作「異步睡眠」?
不,呼叫OnNext
將排隊。下面的代碼寫入到跟蹤時的緩衝時間的經過,以及開始/的OnNext呼叫到該用戶(在其中休眠的緩衝時間的4倍)的端:
Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
.Take(50)
.BufferWithTime(TimeSpan.FromMilliseconds(100))
.Do(_ => Trace.WriteLine("Buffer elapsed"))
.ObserveOn(Scheduler.TaskPool)
.Subscribe(_ =>
{
Trace.WriteLine("Begin OnNext");
Thread.Sleep(200);
Trace.WriteLine("End OnNext");
});
的輸出是如下。你可以看到,開始/結束OnNext無重疊,即使有Buffer elapsed
的OnNext
在通話過程中出現了兩次:
緩衝經過
開始OnNext
緩衝經過
緩衝經過
結束OnNext
開始OnNext
緩衝區已用
緩衝區已用
緩衝區已用
結束OnNext
開始OnNext
結束OnNext
開始OnNext
結束OnNext
開始OnNext
結束OnNext
開始OnNext
結束OnNext