我有一個長時間運行的任務(說一個Observable<Integer>
),我想在應用程序中儘可能多次觸發。我對處理以各種方式發送的事件的任務有多個「視圖」。我的整個應用程序中只有一個subscribe
。如何控制RxJava 2中的訂閱計數?
如何確保長時間運行的任務僅針對每個訂閱觸發一次,並且僅在訂閱需要時觸發?
爲了讓事情變得更具體,這裏是一個單元測試:
@Test
public void testSubscriptionCount() {
final Counter counter = new Counter();
// Some long running tasks that should be triggered once per subscribe
final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5)
.doOnSubscribe(subscription -> {
counter.increment();
});
// Some "view" on the long running task
final Observable<Integer> b = a.filter(x -> x % 2 == 0);
// Another "view" on the long running task
final Observable<Integer> c = a.filter(x -> x % 2 == 1);
// A view on the views
final Observable<Integer> d = Observable.zip(b, c, (x, y) -> x + y);
d.toList().blockingGet();
assertEquals(1, counter.count); // Fails, counter.count == 2
}
我想a
當其意見(b
,c
或d
)一個訂閱才能觸發,但每次訂閱也只有一次。
在上面的代碼中,訂閱發生兩次(我認爲d
觸發b
和c
,這兩個都獨立觸發a
)。
添加.share()
沒有解決不了的問題(雖然我認爲這是沿着正確的路線):
// Some long running tasks that should be triggered once per subscribe
final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5)
.doOnSubscribe(subscription -> counter.increment())
.share();
java.lang.AssertionError:
Expected :1
Actual :2
我不清楚「只在需要時觸發」這個短語的意思。你有單元測試嗎? –
我的意思是observable只能在有人直接訂閱它或者從它構建的其他observable時才運行它的發射器代碼。 – sdgfsdh
你所描述的是觀察者的工作方式。你有什麼理由相信他們不這樣工作? –