2014-04-16 41 views
0

我有一個PublishSubject,我訂閱了兩次。第一個訂閱者只計算處理的項目數量,而且這個值總是與我通過觀察者發送的內容相匹配。然而,另一位用戶正在使用一個緩衝區,我經常(75%)沒有收到所有通過觀察者的項目。我使用緩衝區是否錯誤?我停止發送給觀察者以確保處理所有項目後,我等待的時間超過了時間。rx-java緩衝區丟失項目

Integer downloads1 = 0; 
Integer downloads2 = 0; 
PublishSubject<Object> subject = PublishSubject.create(); 
// this subscriber count matches the expected 
subject.subscribe(s -> { 
    synchronized (downloads1) { 
    downloads1 += 1; 
    } 
}); 
// this subscriber seems to miss items about 75% of the time 
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> { 
    synchronized (downloads2) { 
    downloads2 += list.size(); 
    } 
}); 
+0

源Observable在哪裏?既然你使用'synchronized',我猜你的源代碼Observable可能有一些問題。您需要確保您的Observable在同一線程中發送消息或進行同步。 – zsxwing

回答

0

也許你遇到這個錯誤:https://github.com/Netflix/RxJava/issues/534

而且順便說一句,而不是訂閱,你應該使用reduce(R initialValue, Func2<R,? super T,R> accumulator)爲0的初始值,那麼你不需要自己做任何同步。