2014-12-05 53 views
4

我有一個帶有實時數據的流,以及基本上分隔屬於一起的實時數據部分的流。現在當有人訂閱實時數據流時,我想重播他們的實時數據。但是我不想記住所有的實時數據,只有自上一次其他流發出值以來的部分。通過可觀察限制重播緩衝區

There is an issue這將解決我的問題,因爲有一個重播操作符正是我想要的(或至少我認爲)。

目前如何輕鬆完成此操作?有沒有比以下更好的方法?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem> 
{ 
    private readonly List<TItem> cached = new List<TItem>(); 
    private readonly IObservable<TDelimiter> delimitersObservable; 
    private readonly IObservable<TItem> itemsObservable; 
    public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable) 
    { 
     this.itemsObservable = itemsObservable; 
     this.delimitersObservable = delimitersObservable; 
    } 

    public IDisposable Subscribe(IObserver<TItem> observer) 
    { 
     lock (cached) 
     { 
      cached.ForEach(observer.OnNext); 
     } 

     return itemsObservable.Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     var delimiters = delimitersObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
       }); 
     var items = itemsObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Add(p); 
        } 
       }); 
     return Disposable.Create(
      () => 
       { 
        items.Dispose(); 
        delimiters.Dispose(); 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
      }); 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

只是一個想法的優勢......不會'ConcurrentBag '是更好的選擇比使用''上緩存lock' '?我的意思是,這就是它的設計目的...... – toadflakz 2014-12-05 09:25:02

+1

@toadflakz - AFAIK,ConcurrentBag不保證保留添加順序(如果項目可觀察行爲正確,我使用列表以正確順序獲取項目)。 ConcurrentQueue可以解決這個問題,但清除列表比清除ConcurrentQueue更容易。 – 2014-12-05 09:29:22

+0

感謝您的解釋 - 我對實時數據開發感興趣,因此有經驗的人員對代碼設計的決策見解表示讚賞。 – toadflakz 2014-12-05 09:34:15

回答

4

這是做你想做的嗎?它使所有的鎖定和競爭條件到Rx利弊:)

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T> 
{ 
    private IConnectableObservable<IObservable<T>> _source; 

    public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter) 
    { 
    _source = source 
     .Window(delimiter) // new replay window on delimiter 
     .Select<IObservable<T>,IObservable<T>>(window => 
     { 
     var replayWindow = window.Replay(); 

     // immediately connect and start memorizing values 
     replayWindow.Connect(); 

     return replayWindow; 
     }) 
     .Replay(1); // remember the latest window 
    } 

    IDisposable Connect() 
    { 
    return _source.Connect(); 
    } 

    IDisposable Subscribe(IObserver<T> observer) 
    { 
    return _source 
     .Concat() 
     .Subscribe(observer); 
    } 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

這樣比較好,但我仍然不喜歡'IConnectableObservable '必須實施的事實。這不應該是真的,你不覺得嗎?你在這裏很接近避免它,但仍然是迄今爲止...... ;-) – 2014-12-06 00:44:45

+0

至少,[this](https://github.com/Reactive-Extensions/Rx.NET/issues/54) ,但「ReplaySubject 」的「無功/被動」版本將是最好的。它可以在一行代碼中解決這個問題:'return xs.Multicast(new ReplaySubject (ys));' – 2014-12-06 00:47:36

+0

@DaveSexton - 完全同意,我對這個問題的反應是'IConnectableObservable'的工廠確實是對Rx的歡迎補充。 – 2014-12-06 09:21:45