2017-02-16 75 views
0

我正在開發一個解決方案,例如爲用戶發生高速率更改。 我需要的是記錄每個更改,延遲通知,然後返回不同的更改,例如已更改用戶的用戶ID。 我想出了下面的代碼使用RX:可觀察緩衝區

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .Buffer(EVENT_BUFFER_DELAY) 
    .Where(buffer => buffer.Count > 0)     
    .Subscribe(OnEventBufferProcess); 

這似乎正常工作和我得到

userEventSubject.OnNext(userId); 

我的問題中添加的所有值是:我能明顯的變化例如,當具有多個具有相同用戶標識值的OnNext時,我不希望生成的緩衝區包含重複項。當然,我可以在訂閱處理程序中使用不同的值,但是我想知道這是否可以在rx級別上完成?我嘗試了獨特的方法,但仍然可以獲得所有的值。

由於我只想跟蹤在延遲期間所做的更改,返回訂閱處理程序並重新開始,是否需要清除userEventSubject?

回答

3

如果您使用Observable.Distinct方法,您將檢查不同的IList<int> s,這確實不是您想要的行爲。

看來你需要將獨特的元素應用到緩衝區內的元素。

可以使用Observable.Select方法將此每一個緩衝區:

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .Buffer(EVENT_BUFFER_DELAY) 
    .Where(buffer => buffer.Count > 0) 
    .Select(buffer => buffer.Distinct()) // distinct elements in each buffer 
    .Subscribe(OnEventBufferProcess); 

要知道,當你使用Distinct()方法,你需要警惕你是如何檢查的平等,如果你是參考工作類型 - 你可能需要實現某種相等比較器。在這種情況下,這不是問題,因爲您正在使用int s。

+0

是的,我明白,我需要有一個平等的比較器參考類型。但是有一個問題,OnEventBufferProcess是異步的嗎?正如我理解的正確,而OnEventBufferProcess仍然工作任何調用OnNext將被阻止? – NullReference

+1

@NullReference是的,關於阻止你是正確的。有關於通過異步方法訂閱的一些問題,解決方案取決於您要查找的確切行爲。看到這裏例如:http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async – TheInnerLight

+0

什麼是需要做的處理單獨的線程/線程所以OnNext永遠不會被阻止。據我所知,ObserveOn將允許我指定調度器應在哪些回調函數上運行。 因此,ObserveOn(ThreadPoolScheduler.Instance)將使回調在一個ThreadPool線程上運行,並且只要ThreadPool具有可用線程就不會阻塞? 從測試我可以看出,它似乎按預期工作:) – NullReference

0

你可以使用DistinctUntilChanged擴展方法:

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .DistinctUntilChanged() 
    .Buffer(EVENT_BUFFER_DELAY)  
    .Where(buffer => buffer.Count > 0)     
    .Subscribe(OnEventBufferProcess);