我有以下代碼,它使用PublishSubject
。與RxJava2意外的行爲PublishSubject
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
1)I從它創建一個Observable
並將其映射(對於此示例的目的,我只是轉換爲String
)=>o1
。
2)然後我訂閱o1
3次。
3)最後,我通過調用subject.onNext(1)
「發佈」一個事件。
出乎我的意料,我得到以下的輸出:
Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1
map
最終被稱爲3次,我不明白爲什麼,因爲我訂閱o1
發生map
之後應該發生。我肯定錯過了什麼。任何幫助,將不勝感激。
感謝 燕
您訂閱了'o1'三次,每次創建一個獨立的序列,直到'PublishSubject'將發佈onNext'到所有3個鏈。 – akarnokd
你說「直到'PublishSubject'」:爲什麼它一直到主題?你能否指出我會在哪裏解釋?如果這是正常的行爲有沒有一種方法來轉換地圖後的流,以便它不這樣做? – yan
因爲從所有3個訂閱者的角度來看,PublishSubject是一個多播源,它通過subscribe()調用建立的獨立鏈向他們發送事件。 – akarnokd