0

我正在嘗試創建一個分析股票價格的工具。如何正確重組分組觀察值?

我得到了不同股票的價格數據流,我希望有一個可觀察的事件,當它收到一個新的,獨特的和完整的價格集合時發出事件。

我的計劃:將流分組爲不同股票的不同子流,並重新組合它們的最新值。

比方說,我有一個事件流是這樣的:

from rx import Observable 

stock_events = [ 
    {'stock': 'A', 'price': 15}, 
    {'stock': 'A', 'price': 16}, 
    {'stock': 'B', 'price': 24}, 
    {'stock': 'C', 'price': 37}, 
    {'stock': 'A', 'price': 18}, 
    {'stock': 'D', 'price': 42}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'C', 'price': 31}, 
    {'stock': 'D', 'price': 44} 
] 

price_source = Observable.from_list(stock_events) 

這是我第一次(天真)做法:

a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed() 
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed() 
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed() 
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed() 

(Observable 
    .combine_latest(a_source, b_source, c_source, d_source, lambda *x: x) 
    .subscribe(print)) 

這正確給我:

​​

但是,我覺得這應該更好地處理group_by,而不是幾個過濾器,所以在這裏的重新寫入:

(price_source 
.group_by(lambda e: e['stock']) 
.map(lambda obs: obs.distinct_until_changed()) 
.combine_latest(lambda *x: x) 
.subscribe(print)) 

但是這一次,我得到:

(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,) 

有什麼我錯過這裏?我如何「解開」嵌套的觀察對象?

+0

我不確定GroupBy是否適合您的情況。既然你不想輸出一個元素,直到所有的股票都已經發出,這意味着你知道股票代號是什麼。我認爲最新的Combine最適合您的使用案例。您可能想要通過發佈來共享底層子。 – user630190

回答

0

如果你確實想使用groupby,它會在C#下面。這並不符合您對「完整」設置的要求。根據評論,懷疑CombineLatest在這裏會更好。

price_source.GroupBy(x => x.Stock) 
      .Select(gp => gp.DistinctUntilChanged(x => x.Price)) 
      .SelectMany(x => x) 
      .Subscribe(s => Console.WriteLine($"{s.Stock} : {s.Price}"));