我一直讀了幾文檔的背壓RxJava,但我不能找到喜歡它是如何在庫內部發生的詳細解釋,每個人都只是概括它像「生產者」是太快, 「消費者」太慢了。如何背壓RxJava內部發生
例如像下面的代碼:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
我一直經歷的RxJava源代碼,所以我的理解是,在主線程中,我們要發出的每毫秒的事件,一旦我們發出它,我們通過將值的System.out.println(我)方法,並把它扔進了newThead調度的線程池,運行裏面一個可運行的方法。
所以我的問題是,如何將內部例外發生?原因當我們調用Thread.sleep()方法,我們只是在睡覺,處理方法調用線程 - >的System.out.println(),而不在線程池中影響其他線程,爲什麼會導致異常。是否因爲線程池不再有足夠的可用線程?
感謝
您是說RxJava中的一些運算符會將onNext()事件放入隊列或緩衝區數據結構中嗎?所以這個異常不是由於線程池所致;)? – Qing