我正在開發一個解決方案,例如爲用戶發生高速率更改。 我需要的是記錄每個更改,延遲通知,然後返回不同的更改,例如已更改用戶的用戶ID。 我想出了下面的代碼使用RX:可觀察緩衝區
private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
.Buffer(EVENT_BUFFER_DELAY)
.Where(buffer => buffer.Count > 0)
.Subscribe(OnEventBufferProcess);
這似乎正常工作和我得到
userEventSubject.OnNext(userId);
我的問題中添加的所有值是:我能明顯的變化例如,當具有多個具有相同用戶標識值的OnNext時,我不希望生成的緩衝區包含重複項。當然,我可以在訂閱處理程序中使用不同的值,但是我想知道這是否可以在rx級別上完成?我嘗試了獨特的方法,但仍然可以獲得所有的值。
由於我只想跟蹤在延遲期間所做的更改,返回訂閱處理程序並重新開始,是否需要清除userEventSubject?
是的,我明白,我需要有一個平等的比較器參考類型。但是有一個問題,OnEventBufferProcess是異步的嗎?正如我理解的正確,而OnEventBufferProcess仍然工作任何調用OnNext將被阻止? – NullReference
@NullReference是的,關於阻止你是正確的。有關於通過異步方法訂閱的一些問題,解決方案取決於您要查找的確切行爲。看到這裏例如:http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async – TheInnerLight
什麼是需要做的處理單獨的線程/線程所以OnNext永遠不會被阻止。據我所知,ObserveOn將允許我指定調度器應在哪些回調函數上運行。 因此,ObserveOn(ThreadPoolScheduler.Instance)將使回調在一個ThreadPool線程上運行,並且只要ThreadPool具有可用線程就不會阻塞? 從測試我可以看出,它似乎按預期工作:) – NullReference