2017-09-02 108 views
1

如何實現活性x(理想情況下是RxJava或RxJs中的示例)?將無限流的無限流轉換爲無限流 - 活性X

a |-a-------------------a-----------a-----------a---- 
s1 |-x-x-x-x-x-x -| (subscribe) 
s2      |-x-x-x-x-x-| (subscribe) 
s2            |-x-x-x-x-x-| (subscribe) 
... 
sn 
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe) 

a是觸發事件的每一個應爲無窮流S的一部分的有限流sn同時能夠訂閱每個sn流(爲了做到求和操作)事件的無限流,但在同時保持流S爲無限。

編輯:爲了更具體我提供了我在Kotlin尋找的實現。 每10秒發射一個事件,映射到4個事件的共享有限流。元流是flatMap - 進入正常的無限流。我利用doAfterNext額外訂閱每個有限流並打印出結果。

/** Creates a finite stream with events 
* $ch-1 - $ch-4 
*/ 
fun createFinite(ch: Char): Observable<String> = 
     Observable.interval(1, TimeUnit.SECONDS) 
       .take(4) 
       .map({ "$ch-$it" }).share() 

fun main(args: Array<String>) { 

    var ch = 'A' 

    Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
      .map { createFinite(ch++) } 
      .doAfterNext { 
       it 
         .count() 
         .subscribe({ c -> println("I am done. Total event count is $c") }) 
      } 
      .flatMap { it } 
      .subscribe { println("Just received [$it] from the infinite stream ") } 

    // Let main thread wait forever 
    CountDownLatch(1).await() 
} 

但是我不確定這是否是'純粹的RX'方式。

+1

這看起來像'concatMap',但從問題如何將每個事件映射到一組N個內部源的問題還不清楚。 – akarnokd

+1

也許增加一個你迄今爲止嘗試過的例子,那會讓我們更清楚你想要完成什麼。 – paulpdaniels

+0

http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - 我在閱讀標題 – inf

回答

0

你不清楚你想如何計算。如果你正在做一個總數,那麼就沒有必要做內部認購:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it } 
     .doOnNext(counter.incrementAndget()) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

在另一方面,如果你需要爲每個中間觀察到的一個數,那麼你可以在裏面移動計數在flatMap()並打印出計數和復位它完成:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it 
        .doOnNext(counter.incrementAndget() 
        .doOnCompleted({ long ctr = counter.getAndSet(0) 
             println("I am done. Total event count is $ctr") 
            }) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

這是不是很實用,但是這種報告往往打破正常流。