1
如何實現活性x(理想情況下是RxJava或RxJs中的示例)?將無限流的無限流轉換爲無限流 - 活性X
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
是觸發事件的每一個應爲無窮流S
的一部分的有限流sn
同時能夠訂閱每個sn
流(爲了做到求和操作)事件的無限流,但在同時保持流S
爲無限。
編輯:爲了更具體我提供了我在Kotlin尋找的實現。 每10秒發射一個事件,映射到4個事件的共享有限流。元流是flatMap
- 進入正常的無限流。我利用doAfterNext
額外訂閱每個有限流並打印出結果。
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
但是我不確定這是否是'純粹的RX'方式。
這看起來像'concatMap',但從問題如何將每個事件映射到一組N個內部源的問題還不清楚。 – akarnokd
也許增加一個你迄今爲止嘗試過的例子,那會讓我們更清楚你想要完成什麼。 – paulpdaniels
http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - 我在閱讀標題 – inf