2013-02-11 107 views
10

我試用Rx,因爲它似乎是一個很好的適合我們的領域,但學習曲線讓我吃驚。合併歷史和現場股票價格數據與Rx

我需要將歷史價格數據與實時價格數據結合在一起。

我想通常的做法適應做成Rx的語言這樣的:

  1. 立即訂閱實時價格,並開始緩衝值,我回去
  2. 啓動對歷史的請求價格數據(這需要在訂閱實時價格後發生,因此我們的數據沒有任何空白)
  3. 當他們回來時發佈歷史價格
  4. 一旦我們收到所有歷史數據,發佈緩衝區實時數據,刪除任何值在開始我們的歷史數據重疊
  5. 繼續從實時價飼料重放數據

我有這樣的噁心和不正確的稻草人碼這似乎爲幼稚的測試案例,我寫的工作:

IConnectableObservable<Tick> live = liveService 
    .For(symbol) 
    .Replay(/* Some appropriate buffer size */); 
live.Connect(); 

IObservable<Tick> historical = historyService.For(since, symbol); 

return new[] {historical, live} 
    .Concat() 
    .Where(TicksAreInChronologicalOrder()); 

private static Func1<Tick,bool> TicksAreInChronologicalOrder() 
{ 
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw 
} 

這具有一些缺點

  1. 適當重傳緩衝器尺寸是未知的。設置無限制的緩衝區是不可能的 - 這是一個長期運行的序列。真的,我們需要某種一次性緩衝區,在第一次調用Subscribe時刷新。如果這存在於Rx中,我找不到它。
  2. 即使我們切換到發佈實時價格,重播緩衝區仍將繼續存在。此時我們不需要緩衝區。
  3. 同樣,一旦我們跳過了歷史價格和實時價格之間的初始重疊,就不需要過濾出重疊記號的謂詞。我真的想做一些事情:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */)Wait(this IObservable<TSource>)在這裏有用嗎?

必須有更好的方法來做到這一點,但我仍然在等待我的大腦像Rx那樣練習FP。

我已經考慮解決的另一個選項是1.編寫我自己的Rx擴展,它將是一個排隊消息的ISubject,直到它獲得其第一個訂戶(並在此之後拒絕訂戶?)。也許這是要走的路?

+0

'Switch()'在這裏工作嗎?如:'historical.Switch(live)' – AlexFoxGill 2013-02-12 10:18:33

回答

1

爲了記錄在案,這裏就是我最後還是沒買。我仍然是一個Rx學習者,然後回到.Net上次看到2.0版本。所有反饋都非常感謝。

下面使用的Ticks對象可能包含一個或多個刻度值。歷史數據服務在多個Ticks中返回數據。

public class HistoricalAndLivePriceFeed : IPriceFeed 
{ 
    private readonly IPriceFeed history; 
    private readonly IPriceFeed live; 
    private readonly IClock clock; 

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live) 
:   this(history, live, new RealClock()) 
     { 
    } 
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock) 
    { 
     this.history = history; 
     this.live = live; 
     this.clock = clock; 
    } 

    public IObservable<Ticks> For(DateTime since, ISymbol symbol) 
    { 
     return Observable.Create<Ticks>(observer => 
     { 
      var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol)); 

      var definitelyInHistoricalTicks = clock.Now; 
      // Sleep to make sure that historical data overlaps our live data 
      // If we ever use a data provider with less fresh historical data, we may need to rethink this 
      clock.Wait(TimeSpan.FromSeconds(1)); 

      var liveStreamAfterEndOfHistoricalTicks = liveStream 
       .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks) 
       .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1)); 

      var subscription = history.For(since, symbol) 
       .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1)) 
       .Concat(liveStreamAfterEndOfHistoricalTicks) 
       .Subscribe(observer); 

      return liveStream.And(subscription); 
     }); 
    } 
} 
public static class CompositeDisposableExtensions 
{ 
    public static CompositeDisposable And(this IDisposable disposable, Action action) 
    { 
     return And(disposable, Disposable.Create(action)); 
    } 

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other) 
    { 
     return new CompositeDisposable(disposable, other); 
    } 
} 

使用這個代碼的Rx,我還是不太放心:

using System; 
using System.Collections.Generic; 
using System.Reactive.Disposables; 
using System.Reactive.Subjects; 

namespace My.Rx 
{ 
    /// <summary> 
    /// Buffers values from an underlying observable when no observers are subscribed. 
    /// 
    /// On Subscription, any buffered values will be replayed. 
    /// 
    /// Only supports one observer for now. 
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods 
    /// are hidden. It is not intended that Buffer should be used as an IObserver, 
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed. 
    /// </summary> 
    /// <typeparam name="TSource"></typeparam> 
    public class Buffer<TSource> : ISubject<TSource>, IDisposable 
    { 
     private readonly object gate = new object(); 
     private readonly Queue<TSource> queue = new Queue<TSource>(); 

     private bool isDisposed; 
     private Exception error; 
     private bool stopped; 
     private IObserver<TSource> observer = null; 
     private IDisposable subscription; 

     public static Buffer<TSource> StartBuffering(IObservable<TSource> observable) 
     { 
      return new Buffer<TSource>(observable); 
     } 

     private Buffer(IObservable<TSource> observable) 
     { 
      subscription = observable.Subscribe(this); 
     } 

     void IObserver<TSource>.OnNext(TSource value) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        queue.Enqueue(value); 
       else 
        observer.OnNext(value); 
      } 
     } 

     void IObserver<TSource>.OnError(Exception error) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        this.error = error; 
       else 
        observer.OnError(error); 
       stopped = true; 
      } 
     } 

     void IObserver<TSource>.OnCompleted() 
     { 
      lock (gate) 
      { 
       stopped = true; 
      } 
     } 

     public IDisposable Subscribe(IObserver<TSource> observer) 
     { 
      lock (gate) 
      { 
       if (isDisposed) 
        throw new ObjectDisposedException(string.Empty); 

       if (this.observer != null) 
        throw new NotImplementedException("A Buffer can currently only support one observer at a time"); 

       while(!queue.IsEmpty()) 
       { 
        observer.OnNext(queue.Dequeue()); 
       } 

       if (error != null) 
        observer.OnError(error); 
       else if (stopped) 
        observer.OnCompleted(); 

       this.observer = observer; 
       return Disposable.Create(() => 
              { 
               lock (gate) 
               { 
                      // Go back to buffering 
                this.observer = null; 
               } 
              }); 
      } 
     } 

     private bool IsBuffering 
     { 
      get { return observer == null; } 
     } 


     public void Dispose() 
     { 
      lock (gate) 
      { 
       subscription.Dispose(); 

       isDisposed = true; 
       subscription = null; 
       observer = null; 
      } 
     } 
    } 
} 

其中通過這些測試(我沒有打擾檢查線程安全尚):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world"); 

[Test] 
public void ReplaysBufferedValuesToFirstSubscriber() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    Assert.That(observed, Is.EquivalentTo(new []{1,2})); 
} 

[Test] 
public void PassesNewValuesToObserver() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 
    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    underlying.OnNext(1); 
    underlying.OnNext(2); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 })); 
} 


[Test] 
public void DisposesOfSubscriptions() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)) 
     .Dispose(); 

    underlying.OnNext(1); 

    Assert.That(observed, Is.Empty); 
} 

[Test] 
public void StartsBufferingAgainWhenSubscriptionIsDisposed() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    // These should be buffered 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var firstSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add))) 
    { 
     // Should be passed through to first subscription 
     underlying.OnNext(3); 
    } 
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 })); 

    // First subscription has been disposed- 
    // we should be back to buffering again 
    underlying.OnNext(4); 
    underlying.OnNext(5); 

    var secondSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add))) 
    { 
     // Should be passed through to second subscription 
     underlying.OnNext(6); 
    } 
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6})); 
} 

[Test] 
public void DoesNotSupportTwoConcurrentObservers() 
{ 
    // Use .Publish() if you need to do this 

    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    buffer.Subscribe(Observer.Create<int>(i => { })); 

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void CannotBeUsedAfterDisposal() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void ReplaysBufferedError() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    var observed = new List<int>(); 
    Exception foundException = null; 
    buffer.Subscribe(
     observed.Add, 
     e => foundException = e); 

    Assert.That(observed, Is.EquivalentTo(new []{1})); 
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletion() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    var observed = new List<int>(); 
    var completed = false; 
    buffer.Subscribe(
     observed.Add, 
     () => completed=true); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1 })); 
    Assert.True(completed); 
} 

[Test] 
public void ReplaysBufferedErrorToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ; 

    var observered = new List<int>(); 
    Exception exceptionEncountered = null; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e))); 

    Assert.That(observered, Is.Empty); 
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletionToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ; 

    var observered = new List<int>(); 
    var completed = false; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true))); 

    Assert.That(observered, Is.Empty); 
    Assert.True(completed); 
} 



[Test] 
public void DisposingOfBufferDisposesUnderlyingSubscription() 
{ 
    var underlyingSubscriptionWasDisposed = false; 
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true )); 

    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.True(underlyingSubscriptionWasDisposed); 
} 
0

如果你的歷史和實時數據都是時間或調度爲基礎的,也就是說,事件流看起來像這樣隨着時間的推移:

|----------------------------------------------------> time 
    h h h h h h         historical 
       l l l l l l      live 

您可以使用一個簡單的TakeUntil結構:

var historicalStream = <fetch historical data>; 
var liveStream = <fetch live data>; 

var mergedWithoutOverlap = 
    // pull from historical 
    historicalStream 
     // until we start overlapping with live 
     .TakeUntil(liveStream) 
     // then continue with live data 
     .Concat(liveStream); 

如果你讓所有的歷史數據全部一次,像IEnumerable<T>,你可以使用StartWith搭配您的其他邏輯:

var historicalData = <get IEnumerable of tick data>; 
var liveData = <get IObservable of tick data>; 

var mergedWithOverlap = 
    // the observable is the "long running" feed 
    liveData 
    // But we'll inject the historical data in front of it 
    .StartWith(historicalData) 
    // Perform filtering based on your needs 
    .Where(....); 
1

如何像:

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector) 
{ 
    var replaySubject = new ReplaySubject<T>(); 
    live.Subscribe(replaySubject); 
    return history.Concat(replaySubject).Distinct(selector); 
} 

它使用的序列標識和獨特的過濾重複。

以及相應的測試:

var testScheduler = new TestScheduler(); 

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }), 
    OnNext(2L, new PriceTick { PriceId = 2 }), 
    OnNext(3L, new PriceTick { PriceId = 3 }), 
    OnNext(4L, new PriceTick { PriceId = 4 }), 
    OnCompleted(new PriceTick(), 5L)); 

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }), 
    OnNext(2L, new PriceTick { PriceId = 4 }), 
    OnNext(3L, new PriceTick { PriceId = 5 }), 
    OnNext(4L, new PriceTick { PriceId = 6 }), 
    OnNext(5L, new PriceTick { PriceId = 7 }), 
    OnNext(6L, new PriceTick { PriceId = 8 }), 
    OnNext(7L, new PriceTick { PriceId = 9 }) 
    ); 


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId)); 
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId),() => Console.WriteLine("C")); 

var combined = live.CombineWithHistory(history, t => t.PriceId); 

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId)); 

testScheduler.AdvanceTo(6L); 

如果執行這個測試中,結合放出的價格與IDS 1蜱至8

+0

感謝戴夫,這給我帶來了一些很好的調度器新技巧。 在我們的例子中使用Distinct的問題是: 1)因爲數據以一個或多個刻度而不是單個值的塊形式返回,所以在調用選擇器之前,我們必須先選擇SelectMany。鑑於我們大量的數據和性能要求,這是不可能的。 2)爲了節省內存,我們的刻度具有時間戳精度爲1秒。我們可能會在同一個秒內有幾個滴答聲,所以不可能在沒有狀態的情況下編寫語義正確的選擇器函數。 – 2013-04-24 18:18:36

+0

我實際上改進了這個答案,因爲我自己做了它,但忘了發佈代碼。最後,我基本上排隊了蜱,然後在歷史完成時清除它們。你需要有一些序列號來確保你不會丟失任何數據。 – 2013-04-24 21:52:27

+0

此外,我給出的代碼不符合您的要求,因爲重播主題將保存整個歷史記錄。 – 2013-04-24 21:53:43

0

內存和交易重疊(正確性)方便的方式。
等待您的反饋:

var tradeIds = new HashSet<string>(); 
var replayQuotationTrades = new ReplaySubject<IntradayTrade>(); 
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades); 
return _historyTrades 
       .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler) 
       .Do(t => tradeIds.Add(t.TradeId)) 
       .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades)) 
       .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId))) 
       .Finally(tradeIds.Clear) 
       .Concat(_quotationTrades) 
       .Subscribe(observer);