2012-03-05 79 views
7

其他東西Ob​​servable.Buffer我一直在尋找如何在RX使用Observable.Buffer例子,但找不到任何比鍋爐板時間緩衝的東西更充實。是否有可能比上一次

有似乎是指定一個「bufferClosingSelector」過載,但我不能完成我的腦海裏解決它。

我試圖做的是創建一個按時間或按一個「吸籌」緩衝的序列。 考慮一個請求流,其中每個請求都有一定的權重,而且我不想一次處理超過x累積的權重,或者如果沒有足夠的累積權限,只需要給我上一個時間幀中的內容(常規緩衝區功能)

回答

13

bufferClosingSelector是一個名爲每次獲得可觀察到的,當預計緩衝被關閉,這將產生一個值的功能。

例如,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))作品像普通Buffer(time)超載。

在要重量序列,您可以通過序列應用Scan,然後在你的聚合條件決定。

例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)讓你產生當源序列加起來超過100

您可以使用Amb比賽這兩個成交條件,看看哪一個值序列第一反應:

 .Buffer(() => Observable.Amb 
        (
          Observable.Timer(TimeSpan.FromSeconds(1)), 
          source.Scan((a,c) => a + c).SkipWhile(a => a < 100) 
        ) 
       ) 

可以使用任何系列組合子的產生用於緩衝在該點處被關閉的任何值。

注意: 關閉選擇器給出的值並不重要 - 這是重要的通知。因此,要將不同類型的來源與Amb組合起來,只需將其更改爲System.Reactive.Unit即可。

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit()) 
+0

只是一個簡單的說明,當源是其他類型的可觀察類型時,amb似乎不起作用 – Dmitry 2012-03-06 21:05:19

+0

@Dmitry我只是給出了基本的想法。我編輯過它以包含不同類型的示例。 – Asti 2012-03-07 06:01:38

+0

是否可以從觀察者訪問緩衝區關閉值?例如。時間戳緩衝區用於關閉。 – liang 2015-07-17 19:49:07