我正在尋找一種方法將多個訂閱者附加到RxJava Observable流,每個訂閱者異步處理髮出的事件。RxJava併發與多個訂閱者和事件
我第一次嘗試使用.flatMap(),但似乎沒有任何後續用戶工作。所有訂閱者都在同一個線程上處理事件。
.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread()))
什麼結束了工作,通過創建一個新的可觀察每個耗時在一個新的線程每個事件:
Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
});
});
輸出:
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-2=>2
s1=>RxNewThreadScheduler-3=>3
並與多個用戶的最終結果:
ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
.publish();
e.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
});
});
e.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
});
});
e.connect();
輸出:
s2=>RxNewThreadScheduler-4=>2
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-3=>2
s2=>RxNewThreadScheduler-6=>3
s2=>RxNewThreadScheduler-2=>1
s1=>RxNewThreadScheduler-5=>3
但是,這似乎有點笨重。有沒有更優雅的解決方案,或者RxJava不是一個很好的用例?
我原本想這(兩.observeOn和.subscribeOn),但每當我介紹了我的用戶計算,除幾毫秒的時間花費了如Thread.sleep(100),一切都結束了在同一個線程上再次處理。 –