2017-06-19 51 views
0

我有以下代碼,它使用PublishSubject與RxJava2意外的行爲PublishSubject

val subject = PublishSubject.create<Int>() 

val o1: Observable<String> = 
     subject.observeOn(Schedulers.newThread()).map { i: Int -> 
      println("${Thread.currentThread()} | ${Date()} | map => $i") 
      i.toString() 
     } 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it") 
} 

println("${Thread.currentThread()} | ${Date()} | submitting 1") 

subject.onNext(1) 

1)I從它創建一個Observable並將其映射(對於此示例的目的,我只是轉換爲String)=>o1

2)然後我訂閱o1 3次。

3)最後,我通過調用subject.onNext(1)「發佈」一個事件。

出乎我的意料,我得到以下的輸出:

Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1 

map最終被稱爲3次,我不明白爲什麼,因爲我訂閱o1發生map之後應該發生。我肯定錯過了什麼。任何幫助,將不勝感激。

感謝 燕

+0

您訂閱了'o1'三次,每次創建一個獨立的序列,直到'PublishSubject'將發佈onNext'到所有3個鏈。 – akarnokd

+0

你說「直到'PublishSubject'」:爲什麼它一直到主題?你能否指出我會在哪裏解釋?如果這是正常的行爲有沒有一種方法來轉換地圖後的流,以便它不這樣做? – yan

+0

因爲從所有3個訂閱者的角度來看,PublishSubject是一個多播源,它通過subscribe()調用建立的獨立鏈向他們發送事件。 – akarnokd

回答

1

從評論:

您訂閱o1三次,每次創建一個獨立的序列,直到PublishSubject將派遣onNext所有3支鏈。

從所有3個用戶的角度來看,PublishSubject是一個組播源,它通過由subscribe()調用建立的獨立鏈向他們發送事件。

Subject上應用運算符通常不會使整個鏈變熱,因爲這些運算符元素僅在訂閱時才附加到源Subject。因此,多個訂閱將產生多個通道到相同的上游Subject

使用publish得到一個ConnectableObservable(或另一個PublishSubject在最後),使序列從那時起變得很熱。

+0

我確認在map之後加上'.publish.autoConnect()'就可以了。我不是100%確定你是指另一個'PublishSubject''的意思,但是我的工作方式是添加'.subscribe {otherSubject.onNext(it)}',並在其他主題上進行訂閱。 – yan

+0

要關閉這個循環,我創建了一個要點來演示2個選項https://gist.github.com/ypujante/2ab3c3a135272ea4bc4554cbfc287ca7。最後我選擇了第二個主題,因爲它對我來說似乎更清潔。 – yan