2011-02-11 34 views
3

所以我試圖使用反應來重新組織ID標識的分塊郵件,並有一個問題終止最後的可觀察。我有一個消息類,包括身份證,總大小,有效載荷,組塊數量和類型的有以下客戶端代碼:使用Reactives合併塊化郵件

我需要計算郵件的數量在運行時採取

(from messages in 
    (from messageArgs in Receive select Serializer.Deserialize<Message>(new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message)))) 
group messages by messages.Id into grouped select grouped) 
.Subscribe(g => 
{ 
    var cache = new List<Message>(); 
    g.TakeWhile((int) Math.Ceiling(MaxPayload/g.First().Size) < cache.Count) 
     .Subscribe(cache.Add, 
    _ => { /* Rebuild Message Parts From Cache */ }); 
}); 

首先我通過它們的唯一ID創建一個分組的可觀察過濾消息,然後我試圖緩存每個組中的所有消息,直到我將它們全部收集起來,然後將它們分類並放在一起。上面似乎阻止g.First()。

我需要一種方法來計算從第一個(或任何)消息中獲得的數量,然而這樣做有困難。任何幫助?

回答

3

First是阻塞運算符(否則怎麼能返回T而不是IObservable<T>?)

我想用Scan(隨時間構建了一個彙總)可能是你所需要的。使用Scan,您可以將消息重構的「狀態」隱藏在「構建器」對象中。

MessageBuilder.IsComplete當它收到的消息的大小達到MaxPayload(或任何您的要求)時返回true。 MessageBuilder.Build()然後返回重建的消息。

我也已將您的「消息構建」代碼移動到SelectMany中,它將構建的消息保留在monad中。

(道歉格式化代碼爲擴展方法,我覺得很難讀/寫混合LINQ語法)

Receive 
    .Select(messageArgs => Serializer.Deserialize<Message>(
     new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message)))) 
    .GroupBy(message => message.Id) 
    .SelectMany(group => 
    { 
     // Use the builder to "add" message parts to 
     return group.Scan(new MessageBuilder(), (builder, messagePart) => 
     { 
      builder.AddPart(messagePart); 

      return builder; 
     }) 
     .SkipWhile(builder => !builder.IsComplete) 
     .Select(builder => builder.Build()); 
    }) 
    .Subscribe(OnMessageReceived); 
+0

酷的解決方案,我總是忘記的GroupBy – 2011-02-12 03:01:41