2017-01-19 114 views
2

我想弄清楚Rx運算符的執行順序。Rx運算符的執行順序

我知道的是,最後一個是創建操作符,即。直到訂閱者在那裏(冷觀察),觀察者才創建。

所以,我寫了這個代碼來測試此行爲:

public static void main(String[] args) throws InterruptedException { 

    test(Schedulers.immediate()); 
    test(Schedulers.computation()); 
    ExecutorService executor = Executors.newCachedThreadPool(); 
    test(Schedulers.from(executor)); 
    executor.shutdown(); 
    test(Schedulers.io()); 
    test(Schedulers.newThread()); 
    test(Schedulers.trampoline()); 

} 


static void test(Scheduler scheduler) throws InterruptedException { 
    System.out.printf("-------%s--------\n", scheduler); 

    Observable<Integer> create = Observable.create(c -> { 
     c.onNext(1); 
     c.onCompleted(); 
     print("CREATE"); 
    }); 

    create 
    .subscribeOn(scheduler) 
    .observeOn(scheduler) .map(e -> { print("MAP"); return e * 2; }) 
    .observeOn(scheduler) .subscribe(a -> { print("SUBSCRIBE");}); 

    TimeUnit.MILLISECONDS.sleep(200); 
} 

static synchronized void print(String s) { 
    System.out.printf("%s %s\n", s, Thread.currentThread()); 
} 

輸出(類似的多次運行)

[email protected]- 
MAP Thread[main,5,main] 
SUBSCRIBE Thread[main,5,main] 
CREATE Thread[main,5,main] 
[email protected]-- 
CREATE Thread[RxComputationScheduler-3,5,main] 
MAP Thread[RxComputationScheduler-2,5,main] 
SUBSCRIBE Thread[RxComputationScheduler-1,5,main] 
[email protected] 
MAP Thread[pool-1-thread-2,5,main] 
CREATE Thread[pool-1-thread-3,5,main] 
SUBSCRIBE Thread[pool-1-thread-1,5,main] 
[email protected]---- 
CREATE Thread[RxIoScheduler-4,5,main] 
MAP Thread[RxIoScheduler-3,5,main] 
SUBSCRIBE Thread[RxIoScheduler-2,5,main] 
[email protected]- 
MAP Thread[RxNewThreadScheduler-2,5,main] 
SUBSCRIBE Thread[RxNewThreadScheduler-1,5,main] 
CREATE Thread[RxNewThreadScheduler-3,5,main] 
[email protected]-- 
MAP Thread[main,5,main] 
SUBSCRIBE Thread[main,5,main] 
CREATE Thread[main,5,main] 

看起來既immediatetrampoline調度(均是上運行主線程),執行我期望的正確方式。

但是其他的調度是不同的(但我synchronizing的方法print,這意味着據我所知,我阻止競爭情況發生在std output)。

那麼,爲什麼會發生這種情況呢?

回答

0

immediatetrampoline調度程序使用當前(單個)線程,所以執行的順序是嚴格定義的。

所有其他調度程序都是多線程的。您將三個任務安排到三個不同的線程。

MAP應始終在SUBSCRIBE之前出現,因爲SUBSCRIBE僅在MAP完成後(map()的結果傳遞給訂閱者)進行調度。

除此之外,絕對不能保證任務序列化的順序(通過您的print函數)。

+0

感謝;我只是注意到按以下順序創建的線程:(訂閱,映射,創建) - 基於線程號。但它應該是地圖susbscibe創建在主線程的情況下? –

+0

@MuhammadHewedy線程創建順序無關緊要。無論如何不能保證特定的順序。這就是多線程的本質。您每次運行程序時是否收到相同的結果(所有調度程序的訂單都相同)?嘗試並檢查5-10次。 –

1

既然你的可觀察性是冷鏈不會開始,直到你打電話訂閱。

當您調用訂閱鏈啓動時,首先調用rx.Observable.OnSubscribe#call然後當您調用rx.Observer#onNext值提交給鏈。由於您指定了調用map的調度程序發佈到另一個線程,並且主線程有時間(或不是)來完成rx.Observable.OnSubscribe#call的執行。

如果移動print("CREATE")上述rx.Observer#onNext序列將始終創建 - >地圖 - >訂閱

在這種情況下,MAP總是前訂閱。如果一切都在未定義位置的其他線程中運行,則CREATE爲last。由於線程切換,位置不確定。根據意見

更新爲什麼訂閱線程MAP線程之前創建的?

每個運算符都可以對另一個運算符進行換行並返回它們。

當您從最後創建的observable調用subscribe()調用rx.Observable.OnSubscribe#call時。

然後通過堆棧進行回溯。

rx.internal.operators.OnSubscribeMap#call 
rx.internal.operators.OperatorObserveOn#call 
rx.internal.operators.OnSubscribeMap#call 
rx.internal.operators.OperatorSubscribeOn#call 
... 

如果你看看OperatorObserveOn(cuted代碼)

public final class OperatorObserveOn<T> implements Operator<T, T> { 

    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
     this.scheduler = scheduler; 
     this.delayError = delayError; 
     this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
    } 

    @Override 
    public Subscriber<? super T> call(Subscriber<? super T> child) { 
     if (scheduler instanceof ImmediateScheduler) { 
      // avoid overhead, execute directly 
      return child; 
     } else if (scheduler instanceof TrampolineScheduler) { 
      // avoid overhead, execute directly 
      return child; 
     } else { 
      ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); 
      parent.init(); 
      return parent; 
     } 
    } 

    /** Observe through individual queue per observer. */ 
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { 
     public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { 
      this.child = child; 
      this.recursiveScheduler = scheduler.createWorker(); 
      this.delayError = delayError; 
      this.on = NotificationLite.instance(); 
      int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
      // this formula calculates the 75% of the bufferSize, rounded up to the next integer 
      this.limit = calculatedSize - (calculatedSize >> 2); 
      if (UnsafeAccess.isUnsafeAvailable()) { 
       queue = new SpscArrayQueue<Object>(calculatedSize); 
      } else { 
       queue = new SpscAtomicArrayQueue<Object>(calculatedSize); 
      } 
      // signal that this is an async operator capable of receiving this many 
      request(calculatedSize); 
     } 
    } 
} 

你可以看到它在createNewWorkercall

因此,每個observeOn以相反的順序爲其下方的執行操作員創建新的工作人員。

正如你可以easyly與看到Schedulers.newThread()

CREATE(3) - > MAP(2) - > SUBSCRIBE(1)

+0

無論CREATE;那麼爲什麼SUBSCRIBE線程在上面的MAP線程之前創建? –

+0

我沒有看到在訂閱地圖之前訂閱的任何情況 –

+0

在情況2,3,4,5(除了第一個和最後一個)之外,SUBSCRIBE的線程號在「MAP」的線程號之前。這不是說它在它之前被執行(實際的線程運行順序不是我所說的) –