0
我正在嘗試創建一個分析股票價格的工具。如何正確重組分組觀察值?
我得到了不同股票的價格數據流,我希望有一個可觀察的事件,當它收到一個新的,獨特的和完整的價格集合時發出事件。
我的計劃:將流分組爲不同股票的不同子流,並重新組合它們的最新值。
比方說,我有一個事件流是這樣的:
from rx import Observable
stock_events = [
{'stock': 'A', 'price': 15},
{'stock': 'A', 'price': 16},
{'stock': 'B', 'price': 24},
{'stock': 'C', 'price': 37},
{'stock': 'A', 'price': 18},
{'stock': 'D', 'price': 42},
{'stock': 'B', 'price': 27},
{'stock': 'B', 'price': 27},
{'stock': 'C', 'price': 31},
{'stock': 'D', 'price': 44}
]
price_source = Observable.from_list(stock_events)
這是我第一次(天真)做法:
a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed()
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed()
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed()
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed()
(Observable
.combine_latest(a_source, b_source, c_source, d_source, lambda *x: x)
.subscribe(print))
這正確給我:
但是,我覺得這應該更好地處理group_by
,而不是幾個過濾器,所以在這裏的重新寫入:
(price_source
.group_by(lambda e: e['stock'])
.map(lambda obs: obs.distinct_until_changed())
.combine_latest(lambda *x: x)
.subscribe(print))
但是這一次,我得到:
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,)
有什麼我錯過這裏?我如何「解開」嵌套的觀察對象?
我不確定GroupBy是否適合您的情況。既然你不想輸出一個元素,直到所有的股票都已經發出,這意味着你知道股票代號是什麼。我認爲最新的Combine最適合您的使用案例。您可能想要通過發佈來共享底層子。 – user630190