我想弄清楚Rx運算符的執行順序。Rx運算符的執行順序
我知道的是,最後一個是創建操作符,即。直到訂閱者在那裏(冷觀察),觀察者才創建。
所以,我寫了這個代碼來測試此行爲:
public static void main(String[] args) throws InterruptedException {
test(Schedulers.immediate());
test(Schedulers.computation());
ExecutorService executor = Executors.newCachedThreadPool();
test(Schedulers.from(executor));
executor.shutdown();
test(Schedulers.io());
test(Schedulers.newThread());
test(Schedulers.trampoline());
}
static void test(Scheduler scheduler) throws InterruptedException {
System.out.printf("-------%s--------\n", scheduler);
Observable<Integer> create = Observable.create(c -> {
c.onNext(1);
c.onCompleted();
print("CREATE");
});
create
.subscribeOn(scheduler)
.observeOn(scheduler) .map(e -> { print("MAP"); return e * 2; })
.observeOn(scheduler) .subscribe(a -> { print("SUBSCRIBE");});
TimeUnit.MILLISECONDS.sleep(200);
}
static synchronized void print(String s) {
System.out.printf("%s %s\n", s, Thread.currentThread());
}
輸出(類似的多次運行)
[email protected]-
MAP Thread[main,5,main]
SUBSCRIBE Thread[main,5,main]
CREATE Thread[main,5,main]
[email protected]--
CREATE Thread[RxComputationScheduler-3,5,main]
MAP Thread[RxComputationScheduler-2,5,main]
SUBSCRIBE Thread[RxComputationScheduler-1,5,main]
[email protected]
MAP Thread[pool-1-thread-2,5,main]
CREATE Thread[pool-1-thread-3,5,main]
SUBSCRIBE Thread[pool-1-thread-1,5,main]
[email protected]----
CREATE Thread[RxIoScheduler-4,5,main]
MAP Thread[RxIoScheduler-3,5,main]
SUBSCRIBE Thread[RxIoScheduler-2,5,main]
[email protected]-
MAP Thread[RxNewThreadScheduler-2,5,main]
SUBSCRIBE Thread[RxNewThreadScheduler-1,5,main]
CREATE Thread[RxNewThreadScheduler-3,5,main]
[email protected]--
MAP Thread[main,5,main]
SUBSCRIBE Thread[main,5,main]
CREATE Thread[main,5,main]
看起來既immediate
和trampoline
調度(均是上運行主線程),執行我期望的正確方式。
但是其他的調度是不同的(但我synchronizing
的方法print
,這意味着據我所知,我阻止競爭情況發生在std output
)。
那麼,爲什麼會發生這種情況呢?
感謝;我只是注意到按以下順序創建的線程:(訂閱,映射,創建) - 基於線程號。但它應該是地圖susbscibe創建在主線程的情況下? –
@MuhammadHewedy線程創建順序無關緊要。無論如何不能保證特定的順序。這就是多線程的本質。您每次運行程序時是否收到相同的結果(所有調度程序的訂單都相同)?嘗試並檢查5-10次。 –