2017-10-28 320 views
0

我遇到了RX.net的背壓問題,我找不到解決方案。我有一個可觀察的實時日誌消息流。RX.net的DropQueue機制

var logObservable = /* Observable stream of log messages */ 

,我想通過他們正在通過網絡發送之前,該串行從logObservable實時日誌消息的TCP接口暴露。所以,我做到以下幾點:

foreach (var message in logObservable.ToEnumerable()) 
{ 
    // 1. Serialize message 
    // 2. Send it over the wire. 
} 

.ToEnumerable()問題出現,如果回壓的情況發生例如如果另一端的客戶端暫停流。問題是.ToEnumerable()緩存導致大量內存使用的項目。我正在尋找一種類似於DropQueue的機制,它只緩衝最後10條消息,例如

var observableStream = logObservable.DropQueue(10).ToEnumerable(); 

這是解決此問題的正確途徑嗎?你知道實施這樣的機制,以避免可能的背壓問題?

+0

'。取(10).toenumerable()'會的工作不是嗎? –

+0

我想通過電線連續傳輸日誌消息。如果我按照你的建議做,它不僅需要10條日誌消息,然後完成可觀察流?我試圖解決的問題是,如果客戶端速度太慢而無法檢索日誌消息或暫停流,它應該只緩存10件物品,而不是無數的物品。 – SOK

+0

'.Throttle(...)'或'.Sample(..)'? – Enigmativity

回答

0

DropQueue實現:

public static IEnumerable<TSource> ToDropQueue<TSource>(
     this IObservable<TSource> source, 
     int queueSize, 
     Action backPressureNotification = null, 
     CancellationToken token = default(CancellationToken)) 
    { 
     var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize); 
     var isBackPressureNotified = false; 

     var subscription = source.Subscribe(
      item => 
      { 
       var isBackPressure = queue.Count == queue.BoundedCapacity; 

       if (isBackPressure) 
       { 
        queue.Take(); // Dequeue an item to make space for the next one 

        // Fire back-pressure notification if defined 
        if (!isBackPressureNotified && backPressureNotification != null) 
        { 
         backPressureNotification(); 
         isBackPressureNotified = true; 
        } 
       } 
       else 
       { 
        isBackPressureNotified = false; 
       } 

       queue.Add(item); 
      }, 
      exception => queue.CompleteAdding(), 
      () => queue.CompleteAdding()); 

     token.Register(() => { subscription.Dispose(); }); 

     using (new CompositeDisposable(subscription, queue)) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
      { 
       yield return item; 
      } 
     } 
    }