2017-02-14 67 views
6

我需要實現Rx.NET如下算法:在Rx.NET處理背壓不onBackpressureLatest

  1. stream就拿最新的項目,或等待新的項目,而不阻塞,如果沒有新項目。只有最新的項目很重要,其他人可以放棄。請輸入SlowFunction並打印輸出。從步驟
  2. 重複1

天真的解決方案是:

let PrintLatestData (stream: IObservable<_>) = 
    stream.Select(SlowFunction).Subscribe(printfn "%A") 

然而,這種解決方案並不因爲平均stream發出物品超過SlowFunction速度可以消耗他們的工作。由於Select不會刪除項目,而是嘗試按從最舊到最新的順序處理每個項目,因此隨着程序運行,發送和打印的項目之間的延遲將逐漸趨於無窮大。爲避免這種無限增長的背壓,只應從流中採取最新的項目。

我搜索了文檔,在RxJava中找到了一個名爲onBackpressureLatest的方法,這對我來說可以做到我上面所描述的。但是,該方法在Rx.NET中不存在。如何在Rx.NET中實現這一點?

+1

做什麼用'SlowFunction'比流慢的問題? –

+0

@FyodorSoikin如果'SlowFunction'比流緩慢,新項目的發射速度比它們可以處理和打印的速度要快。因此,隨着程序的運行,正在打印的新項目與正在打印的項目的「SlowFunction」輸出之間的延遲/延遲會朝向無限增長。這是不可接受的,因爲我需要實時監控數據。我只關心最新的項目。 – Steve

+1

'SlowFunction'是否同步?還是Observable/Async? – Shlomo

回答

7

我想你想用ObserveLatestOn之類的東西。它有效地用一個值和一個標誌替換傳入事件的隊列。

詹姆斯世界已經在博客在這裏http://www.zerobugbuild.com/?p=192

的概念是在傾斜信任的速度有多快,服務器可以在其推數據GUI應用程序大量使用。

您還可以看到在無交易https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 和支持演示的實現解釋ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014

需要明確的是,這是一個甩負荷算法,而不是一個背壓算法。

+0

'ObserveLatestOn'正是我所需要的,謝謝! – Steve

+1

雖然這可能在理論上回答這個問題,但[這將是更可取的](// meta.stackoverflow.com/q/8259)在這裏包括答案的基本部分,並提供供參考的鏈接。 – Draken

+0

或者指出它可能是http://stackoverflow.com/questions/6384312/how-can-i-observe-values-in-a-non-blocking-way-using-rx和http:/ /stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is/。我想這個答案是探測,我不確定他是否想要背壓(按照標題)或卸載(根據說明) –

1

同步/異步建議可能會有所幫助,但是,由於慢速函數始終比事件流慢,因此異步可能允許您以((在線程池中觀察)並行處理最終)只是用盡線程或通過上下文切換添加更多延遲。這聽起來不像是我的解決方案。

我建議你看一下由Dave Sexton編寫的開源Rxx'內省'操作符。這些可以改變最新的緩衝器/節流時間,因爲隊列由於消費緩慢而備份。如果慢速函數突然變得更快,它不會緩衝任何東西。如果變慢,它會緩衝更多。 您必須檢查是否有'最新'類型,或者只是修改現有的以適應您的需求。例如。使用緩衝區,只需緩衝區中的最後一項,或進一步增強內部只存儲最新的。 Google'Rxx',你會在Github的某處找到它。

如果「慢速功能」的時間相當可預測,則更簡單的方法是簡單地將流量限制在超過此時間的數量。顯然,我並不是指標準的rx'油門',而是一個通過更新而不是舊的更新。這裏有很多解決這類問題的方法。

1

前段時間我也遇到了同樣的問題,而且我沒有找到一個內置的操作符來完成這個操作。所以我寫了我自己的,我叫Latest。不是微不足道的實施,而是發現它在我目前的項目中非常有用。

它的工作原理如下:當觀察者正忙於處理先前的通知(當然是在它自己的線程上)時,它會盡快將隊列中的最後n個通知(n> = 0)和OnNext作爲觀察者它變得空閒。所以:

  • Latest(0):只觀察到達項目,而觀察者處於閒置狀態
  • Latest(1):始終遵守最新的
  • Latest(1000)(EG):一般處理所有的項目,但如果東西卡住了線,而想念一些比得到一個OutOfMemoryException
  • Latest(int.MaxValue):不要錯過任何一個項目,但負載平衡生產者和消費者之間。因此

您的代碼將是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

簽名看起來是這樣的:

/// <summary> 
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. 
/// </summary> 
/// <param name="source">The source sequence.</param> 
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> 
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> 
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> 
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> 
/// <remarks> 
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. 
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. 
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. 
/// </remarks> 
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 

實現太大,張貼在這裏,但如果有人有興趣,我會高興地分享。讓我知道。

1

你可以在sample流中間隔你知道SlowFunction可以處理。以下是一個Java的例子:

TestScheduler ts = new TestScheduler(); 

Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500); 
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println); 

ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 
98 
198 
298 
398 
498 
499 

sample不會導致反壓始終抓住流中的最新值,所以它符合您的要求。另外sample將不會發送兩次相同的值(可以從上面看到,因爲499只打印一次)

我認爲這將是一個有效的C#/F#解決方案:

static IDisposable PrintLatestData<T>(IObservable<T> stream) { 
    return stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(Console.WriteLine); 
} 
let PrintLatestData (stream: IObservable<_>) = 
    stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(printfn "%A") 
+1

這是一個非常簡單的解決方案,但在我的特殊用例SlowFunction的運行時間變化太大,這種方法的工作。感謝分享。 – Steve