前段時間我也遇到了同樣的問題,而且我沒有找到一個內置的操作符來完成這個操作。所以我寫了我自己的,我叫Latest
。不是微不足道的實施,而是發現它在我目前的項目中非常有用。
它的工作原理如下:當觀察者正忙於處理先前的通知(當然是在它自己的線程上)時,它會盡快將隊列中的最後n個通知(n> = 0)和OnNext
作爲觀察者它變得空閒。所以:
Latest(0)
:只觀察到達項目,而觀察者處於閒置狀態
Latest(1)
:始終遵守最新的
Latest(1000)
(EG):一般處理所有的項目,但如果東西卡住了線,而想念一些比得到一個OutOfMemoryException
Latest(int.MaxValue)
:不要錯過任何一個項目,但負載平衡生產者和消費者之間。因此
您的代碼將是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")
簽名看起來是這樣的:
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
實現太大,張貼在這裏,但如果有人有興趣,我會高興地分享。讓我知道。
做什麼用'SlowFunction'比流慢的問題? –
@FyodorSoikin如果'SlowFunction'比流緩慢,新項目的發射速度比它們可以處理和打印的速度要快。因此,隨着程序的運行,正在打印的新項目與正在打印的項目的「SlowFunction」輸出之間的延遲/延遲會朝向無限增長。這是不可接受的,因爲我需要實時監控數據。我只關心最新的項目。 – Steve
'SlowFunction'是否同步?還是Observable/Async? – Shlomo