2017-07-02 85 views
2

的問題,我試圖解決Rx.Net - 獲取股票價格的變化,並進行處理

  1. 獲取股票蜱
  2. 始終考慮最新股價
  3. 每個X秒取快照的蜱併發送處理

所以我有一個Observable股票蜱的來源。它只發送我感興趣的股票的蜱。我需要做的是收到這個股票價格,並在每個x秒(例如,讓我們說每3秒)發送一個快照處理價格。如果在3秒鐘內收到同一只股票的2個蜱蟲,我只需要最新的蜱蟲。這個處理計算量很大,所以如果可能的話,我希望避免兩次處理相同的股票價格。

舉一個例子。

假設在序列的開始,我收到2個刻度 - >MSFT:1 $,GOOG:2 $

在接下來的3秒內我什麼也沒收到,所以MSFT & GOOG應該發送處理滴答。

現在下一秒我收到新的節拍 - >MSFT:1 $,GOOG:3 $,INTL:3 $

再次讓我們接下來的3秒沒有一樣是在範圍內承擔

這裏。 ,因爲MSFT價格沒有變化(它仍然是1美元),只有GOOG & INTL應該發送處理。

而這一直重複一整天。

現在,我認爲Rx幫助解決這種問題,容易&優雅的方式。但我有一個問題,有適當的查詢。 這是我到目前爲止,會盡量解釋它做什麼,什麼是它的問題

var finalQuery = 
       from priceUpdate in **Observable<StockTick>** 
       group priceUpdate by priceUpdate.Stock into grouped 
       from combined in Observable.Interval(TimeSpan.FromSeconds(3)) 
         .CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t }) 
       group combined by new { combined.Interval } into combined 
       select new 
       { 
        Interval = combined.Key.Interval, 
        PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price)) 
       }; 

      finalQuery 
       .SelectMany(combined => combined.PEI) 
       .Distinct(pu => new { pu.Stock, pu.Price }) 
       .Subscribe(priceUpdate => 
       { 
        Process(priceUpdate); 
       }); 

public class StockTick 
{ 
    public StockTick(string stock, decimal price) 
    {  
     Stock = stock; 
     Price = price; 
    } 
    public string Stock {get;set;} 
    public decimal Price {get;set;} 
} 

因此,這得到了股票價格,團體它通過股票,再結合最新的這組

ed序列與Observable.Interval。通過這種方式,我試圖確保只處理股票的最新股票價格,並且每3秒啓動一次。

然後它再次按照時間間隔對它進行分組,結果我有通過的每3秒間隔的一組序列。

作爲最後一步,我使用SelectMany將此序列扁平化爲股票價格更新的順序,並且我正在應用Distinct以確保同一個股票的相同價格不會被處理兩次。

這個查詢有2個問題我不喜歡。首先是我不喜歡雙人組合 - 有什麼辦法可以避免嗎?其次 - 用這種方法,我必須逐一處理價格,我真正想要的是快照 - 這是在3秒鐘內,無論我有什麼我會扣上來發送處理,但無法弄清楚如何到扣起來

我很樂意提供其他方式解決此問題的建議,但我寧願留在Rx內,除非真的有更好的方法。

+0

在你的例子中,你不會發送微軟,因爲它沒有改變,但Google和你也沒有發送。你能澄清一下嗎? – Enigmativity

+0

@Enigmativity道歉混淆,意味着寫**谷歌:3 $ ** ...將更新現在的帖子 – Michael

回答

3

有兩件事情:

  1. 你要採取Sample運營商的優勢:
  2. 你可能想DistinctUntilChanged而不是Distinct。如果您使用Distinct,那麼如果MSFT從$ 1變爲$ 2,然後又變回$ 1,您將不會在第三個記號上得到一個事件。

我想,你的解決方案將是這個樣子:

IObservable<StockTick> source; 
source 
    .GroupBy(st => st.Stock) 
    .Select(stockObservable => stockObservable 
     .Sample(TimeSpan.FromSeconds(3)) 
     .DistinctUntilChanged(st => st.Price) 
    ) 
    .Merge() 
    .Subscribe(st => Process(st)); 

編輯Distinct性能問題):

每個Distinct運營商必須在其內保持,充分截然不同的歷史。如果你有一個高價股,比如AMZN,到目前爲止,它的價格從958美元到974美元不等,那麼你可能會得到很多數據。這是〜1600個可能的數據點,必須坐在內存中,直到您退訂Distinct。它最終還會降低性能,因爲每個AMZN勾號必須在經歷之前與1600個現有數據點進行比較。如果這是一個長期運行的過程(跨越多個交易日),那麼您將得到更多的數據點。

鑑於N股,你有N Distinct運營商需要相應地操作。乘以N股的行爲,你有一個不斷增加的問題。

+0

感謝您的答案,讓我試試這個並回饋給您。在一個相對說明中,我確實希望'不同',正如我所提到的那樣,我處理價格,所以同一價格不應該在收到價格時處理兩次。 – Michael

+0

您可能想要小心那裏的內存影響。 – Shlomo

+0

內存影響?你能否詳細說明一下? – Michael