2016-11-08 177 views
7

什麼線程退訂當我們沒有指定它,但仍然指定subscribeOn的線程時默認爲? 我們是否需要指定我們希望取消訂閱的線程,即使它與subscribeOn中使用的線程相同?unSubscribeOn調用了什麼線程?我們應該叫它嗎?

或者是底部的兩個片段對un-subscription執行相同操作嗎?

選項1:

mSubscription.add(myAppProvider 
     .getSomeData() 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .unsubscribeOn(Schedulers.io()) 
     .subscribe(data -> handleData(data), 
       throwable -> handleError(throwable) 
       )); 

選項2:

mSubscription.add(myAppProvider 
     .getSomeData() 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(data -> handleData(data), 
       throwable -> handleError(throwable) 
       )); 

我沒看Rx-Java docs但他們只解釋subscribeOn但隻字未提unSubscribeOn

回答

3

沒有subscribeOn(和不observeOn) ,您的取消訂閱操作將在訂閱開始的任何線程上發生。

使用subscribeOn,您的取消訂閱操作將發生在由subscribeOn指定的調度程序中。

對於observeOn,您的取消訂閱操作將在由observeOn(覆蓋由subscribeOn指定的調度程序)指定的調度程序上發生。

這是sample。正如那裏所暗示的,當退訂本身涉及想要在其他線程上運行的長時間運行的操作時,這很有用。

如果運行他們的測試代碼:

Observable<Object> source = Observable.using(
    () -> { 
     System.out.println("Subscribed on " + Thread.currentThread().getId()); 
     return Arrays.asList(1,2); 
    }, 
    (ints) -> { 
     System.out.println("Producing on " + Thread.currentThread().getId()); 
     return Observable.from(ints); 
    }, 
    (ints) -> { 
     System.out.println("Unubscribed on " + Thread.currentThread().getId()); 
    } 
); 

source 
    .unsubscribeOn(Schedulers.newThread()) 
    .subscribe(System.out::println); 

你應該看到他們的預期輸出:

Subscribed on 1 
Producing on 1 
1 
2 
Unubscribed on 11 

如果刪除unsubscribeOn線,你會看到:

Unsubscribed on 1 
+0

嘿,謝謝你的回答。我喜歡你對unSubscribeOn正在發生的線程的解釋。然而,這個例子並不適用你在我創建的樣本中顯示的方式進行測試。對於我退訂也發生在同一個線程上。 '1'。想要修復您的代碼,以便我可以接受它?另一個是你提到的'文檔'實際上是一個樣本。不是真正的Rx-Java文檔。真正的文檔在這裏http://reactivex.io/intro.html。 – achie

+0

好啊,絕對不是真正的文檔。你能發佈你用來檢查線程的完整代碼嗎? – drhr

+0

是的,我將添加爲一個單獨的答案,以便您可以使用它來更新您的答案。或者可能會修復它,如果我在我的方法中做任何錯誤:) – achie

3

這些片段具有不同的行爲。

一般來說,取消訂閱是按照操作員順序進行的,可以由任何線程發起(只需從任何線程調用subscriber.unsubscribe())。如果沒有unsubscribeOn運算符,那麼取消訂閱操作可能會完成對其被調用的線程的操作。 unsubscribeOn提供了用於在其上游取消訂閱的線程的更好的控制。

1

什麼線程取消訂閱當我們不指定它時默認爲 但仍指定subscribeOn的線程?

  • 默認情況下,當既不subscribeOn/observeOn/unsubscribeOn的是 集,則unsubscribeOn(以及其他)默認爲 當前線程。

  • 如果我們設置subscribeOn一個線程並沒有對 observeOn/unsubscribeOn,然後unsubscribeOn將使用subscribeOn指定的同一個線程 。

  • 如果我們同時調用subscribeOn和ObserveOn而不取消訂閱,那麼 unsubscribeOn將使用observeOn中指定的線程。

  • 如果所有這三種方法(subscribeOn,observeOn和unsubscribeOn)是 集,然後unsubscribeOn將使用 unsubscribeOn指定的線程。實際上,取消訂閱將會在線程 上發生,在unsubscribeOn方法中指定,而不管前面的 方法是否設置天氣。

難道我們需要指定我們想要的,即使是同一個線程中 subscribeOn所使用的非訂閱 發生線程?

  • 如所解釋的,如果它被設置在上述observeOn如果沒有設置unsubscribeOn然後unsubscribeOn 發生。如果不是,則發生在由subscribeOn設置的 線程上。現在我們不需要爲取消訂閱設置不同的 線程,除非您在取消訂閱時執行一些長時間運行的任務 。在大多數情況下或至少從我的代碼中,這是 是真實的,所以我們不必設置不同的線程。

下面是我創建的一個示例,可用於在Android中測試上述內容。只需根據需要評論或更改線索即可測試各種結果。

public void testRxThreads() { 
     createThreadObservable() 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .unsubscribeOn(Schedulers.newThread()) 
      .subscribe(printResult()); 
    } 

    private Observable<String> createThreadObservable() { 
     return Observable.create(subscriber -> { 
      subscriber.add(new Subscription() { 
       @Override 
       public void unsubscribe() { 
        System.out.println("UnSubscribe on Thread: " 
         + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); 
        // perform unsubscription 
       } 

       @Override 
       public boolean isUnsubscribed() { 
        return false; 
       } 
      }); 

      subscriber.setProducer(n -> { 
       System.out.println("Producer thread: " 
        + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); 
      }); 

      subscriber.onNext("Item 1"); 
      subscriber.onNext("Item 2"); 
      subscriber.onCompleted(); 
     }); 
    } 

    private Action1<String> printResult() { 
     return result -> { 
      System.out.println("Subscriber thread: " 
       + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); 
      System.out.println("Result: " + result); 
     }; 
    } 

這產生下列結果

Producer thread: 556 RxIoScheduler-2 
Subscriber thread: 1 main 
Result: Item 1 
Subscriber thread: 1 main 
Result: Item 2 
UnSubscribe on Thread: 557 RxNewThreadScheduler-1 

注出或移除unsubscribeOn生成以下。

Subscriber thread: 1 main 
Result: Item 1 
Subscriber thread: 1 main 
Result: Item 2 
UnSubscribe on Thread: 1 main 

刪除了這兩個observeOn和unsubscribeOn調用產生如下:

Producer thread: 563 RxIoScheduler-2 
Subscriber thread: 563 RxIoScheduler-2 
Result: Item 1 
Subscriber thread: 563 RxIoScheduler-2 
Result: Item 2 
UnSubscribe on Thread: 563 RxIoScheduler-2 

感謝drhr事件的初步解釋。這幫助我更多地研究並驗證上述樣本的結果。

+0

這一切看起來都對。你的結果有什麼不同? – drhr

+0

哦,我遇到的問題是,您鏈接到的示例中顯示的示例聲明Unubscribe on會在unSubscribeOn中指定的線程上發生,但我無法讓它爲我工作。相反,它只發生在subscribeOn使用的線程上。我在你的樣品上做了這個和它一起玩。來源 .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.newThread()) .subscribe(System.out :: println);這就是說,訂閱和取消訂閱發生在同一個線程'Unubscribed on 564 RxIoScheduler-3' – achie

+0

哦,即使我沒有調用subscribeOn,只是在unsubscribeOn中指定了一個線程,它並沒有使用新的線程。相反,一切都是從同一個線索發生的。 1而不是11作爲答案中的線程ID。 – achie