2017-02-24 58 views
1

我有一個事件源生成屬於某些組的事件。我想緩衝這些組並將這些組(批量)發送到存儲。到目前爲止,我有這個:緩衝組按組進行反應擴展,嵌套訂閱

eventSource 
    .GroupBy(event => event.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Subscribe(group => group.Events 
          .Buffer(TimeSpan.FromSeconds(60), 100) 
          .Subscribe(list => SendToStorage(list))); 

所以有一個嵌套的訂閱組中的事件。不知何故,我認爲有更好的方法,但我還沒有弄清楚。

回答

3

這裏的解決方案:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 

這裏有一對夫婦一般規則,可以幫助你 '減少':

1)嵌套訂閱通常在嵌套訂閱之前嵌入訂閱,然後跟着Merge,然後嵌套訂閱。因此應用的是,你會得到這樣的:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 

2)可以很明顯的將兩個連續的選擇(既然你不匿名對象做任何事情,只需刪除):

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 

3)最後,Select後跟一個Merge可以減少到一個SelectMany

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 
+1

一個優秀的,公呈現答案。 – Enigmativity

+0

寫了一些單元測試,以驗證它繼續工作,並像一個魅力。謝謝! –

1

這裏是一個辦法做到這一點

(from g in eventSource.GroupByUntil(e => e.GroupingKey, 
            g => g.Buffer(TimeSpan.FromSeconds(60), 100)) 
from b in g.ToList() 
select b).Subscribe(SendToStorage);