我有一個帶有實時數據的流,以及基本上分隔屬於一起的實時數據部分的流。現在當有人訂閱實時數據流時,我想重播他們的實時數據。但是我不想記住所有的實時數據,只有自上一次其他流發出值以來的部分。通過可觀察限制重播緩衝區
There is an issue這將解決我的問題,因爲有一個重播操作符正是我想要的(或至少我認爲)。
目前如何輕鬆完成此操作?有沒有比以下更好的方法?
private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
private readonly List<TItem> cached = new List<TItem>();
private readonly IObservable<TDelimiter> delimitersObservable;
private readonly IObservable<TItem> itemsObservable;
public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
{
this.itemsObservable = itemsObservable;
this.delimitersObservable = delimitersObservable;
}
public IDisposable Subscribe(IObserver<TItem> observer)
{
lock (cached)
{
cached.ForEach(observer.OnNext);
}
return itemsObservable.Subscribe(observer);
}
public IDisposable Connect()
{
var delimiters = delimitersObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Clear();
}
});
var items = itemsObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Add(p);
}
});
return Disposable.Create(
() =>
{
items.Dispose();
delimiters.Dispose();
lock (cached)
{
cached.Clear();
}
});
}
public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}
只是一個想法的優勢......不會'ConcurrentBag'是更好的選擇比使用''上緩存lock' '?我的意思是,這就是它的設計目的...... –
toadflakz
2014-12-05 09:25:02
@toadflakz - AFAIK,ConcurrentBag不保證保留添加順序(如果項目可觀察行爲正確,我使用列表以正確順序獲取項目)。 ConcurrentQueue可以解決這個問題,但清除列表比清除ConcurrentQueue更容易。 – 2014-12-05 09:29:22
感謝您的解釋 - 我對實時數據開發感興趣,因此有經驗的人員對代碼設計的決策見解表示讚賞。 – toadflakz 2014-12-05 09:34:15