2016-08-12 55 views
2

我一直讀了幾文檔的背壓RxJava,但我不能找到喜歡它是如何在庫內部發生的詳細解釋,每個人都只是概括它像「生產者」是太快, 「消費者」太慢了。如何背壓RxJava內部發生

例如像下面的代碼:

Observable.interval(1, TimeUnit.MILLISECONDS) 
    .observeOn(Schedulers.newThread()) 
    .subscribe(
     i -> { 
      System.out.println(i); 
      try { 
       Thread.sleep(100); 
      } catch (Exception e) { } 
     }, 
     System.out::println); 

我一直經歷的RxJava源代碼,所以我的理解是,在主線程中,我們要發出的每毫秒的事件,一旦我們發出它,我們通過將值的System.out.println(我)方法,並把它扔進了newThead調度的線程池,運行裏面一個可運行的方法。

所以我的問題是,如何將內部例外發生?原因當我們調用Thread.sleep()方法,我們只是在睡覺,處理方法調用線程 - >的System.out.println(),而不在線程池中影響其他線程,爲什麼會導致異常。是否因爲線程池不再有足夠的可用線程?

感謝

回答

3

你能想到的背壓作爲許可證制度一個操作員的手了其上游源:你可以給我128元。稍後,該運營商可能會說「好吧,再給我96個」,因此總共可以有224個未完成的許可證。有些來源,如interval不關心許可證,只是定期遞交價值觀。由於許可證的數量通常是強烈依賴於可用容量在隊列或緩存器,除了這些存儲器端出更可容納產量MissingBackpressureException

檢測到背壓違規主要發生在有限隊列的offer返回false時,例如observeOn中的指示隊列已滿。

檢測違反是由操作者跟蹤的突出允許計數的第二種方式,例如onBackpressureDrop並且每當向上遊發送比這更多的,操作者只需將不轉發它:

// in onBackpressureDrop 
public void onNext(T value) { 
    if (emitted != availablePermits) { 
     emitted++; 
     child.onNext(value); 
    } else { 
     // ignoring this value 
    } 
} 

的兒童用戶信號經由請求()其允許通常導致像這樣在onBackpressureDrop

public void childRequested(long n) { 
    availablePermits += n; 
} 

在實踐中,由於可能異步執行,availablePermits是一個ÑAtomicLong(並且被稱爲requested)。

+0

您是說RxJava中的一些運算符會將onNext()事件放入隊列或緩衝區數據結構中嗎?所以這個異常不是由於線程池所致;)? – Qing