2016-11-06 109 views
1

我正在尋找一種方法將多個訂閱者附加到RxJava Observable流,每個訂閱者異步處理髮出的事件。RxJava併發與多個訂閱者和事件

我第一次嘗試使用.flatMap(),但似乎沒有任何後續用戶工作。所有訂閱者都在同一個線程上處理事件。

.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread())) 

什麼結束了工作,通過創建一個新的可觀察每個耗時在一個新的線程每個事件:

Observable.from(Arrays.asList(new String[]{"1", "2", "3"})) 
      .subscribe(j -> { 
       Observable.just(j) 
         .subscribeOn(Schedulers.newThread()) 
         .subscribe(i -> { 
          try { 
           Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); 
          } catch (InterruptedException e) { 
           e.printStackTrace(); 
          } 
          System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i); 
         }); 
      }); 

輸出:

s1=>RxNewThreadScheduler-1=>1 
s1=>RxNewThreadScheduler-2=>2 
s1=>RxNewThreadScheduler-3=>3 

並與多個用戶的最終結果:

ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"})) 
      .publish(); 

    e.subscribe(j -> { 
     Observable.just(j) 
       .subscribeOn(Schedulers.newThread()) 
       .subscribe(i -> { 
        try { 
         Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); 
        } catch (InterruptedException e1) { 
         e1.printStackTrace(); 
        } 
        System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i); 
       }); 
    }); 

    e.subscribe(j -> { 
     Observable.just(j) 
       .subscribeOn(Schedulers.newThread()) 
       .subscribe(i -> { 
        try { 
         Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); 
        } catch (InterruptedException e1) { 
         e1.printStackTrace(); 
        } 
        System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i); 
       }); 
    }); 

    e.connect(); 

輸出:

s2=>RxNewThreadScheduler-4=>2 
s1=>RxNewThreadScheduler-1=>1 
s1=>RxNewThreadScheduler-3=>2 
s2=>RxNewThreadScheduler-6=>3 
s2=>RxNewThreadScheduler-2=>1 
s1=>RxNewThreadScheduler-5=>3 

但是,這似乎有點笨重。有沒有更優雅的解決方案,或者RxJava不是一個很好的用例?

回答

0

使用.flatMap(s -> Observable.just(s).observeOn(Schedulers.newThread())....)

+0

我原本想這(兩.observeOn和.subscribeOn),但每當我介紹了我的用戶計算,除幾毫秒的時間花費了如Thread.sleep(100),一切都結束了在同一個線程上再次處理。 –

0

,如果我理解正確的RX-合同,你正在嘗試做一些事情,這是反對的。

讓我們看看在合同

的RxJava可觀察到的契約是事件(onNext(),onCompleted()以及OneR ROR())不能被同時發射。換句話說,一個單一的Observable 流必須總是被序列化並且是線程安全的。只要排放不同時,每個事件都可以從不同的線程發出。這意味着 inter- 離開或onNext()的同時執行。如果onNext()仍然在 一個線程上執行,另一個線程不能再次調用它(交錯)。 --Tomasz Nurkiewicz在Reactive Programming with RxJava

在我看來,你試圖通過在外部訂閱中使用嵌套訂閱來破壞合約。對用戶的onNext調用不再被序列化。與doOnNext上沿Observable

ConnectableObservable<String> stringObservable = Observable.from(Arrays.asList(new String[]{"1", "2", "3"})) 
      .flatMap(s -> { 
       return Observable.just(s).subscribeOn(Schedulers.computation()); 
      }) 
      .publish(); 

    stringObservable 
      .flatMap(s -> { 
       // do More asyncStuff depending on subscription 
       return Observable.just(s).subscribeOn(Schedulers.newThread()); 
      }) 
      .subscribe(s -> { 
       // use result here 
      }); 

    stringObservable 
      .subscribe(s -> { 
       // use immediate result here. 
      }); 

    stringObservable.connect(); 
+0

我認爲合同適用於Observable而不是訂戶。我沒有試圖用併發事件創建一個單一的Observable流。但是,我試圖讓每個事件都有多個不相關的訂閱者。由於它們不相關,我想在不同的線程中處理它們以利用並行化。在我目前的用例中,用戶進行了一系列不需要任何未來處理的網絡調用,因此我不確定將它移到.flatMap()運算符會帶來什麼好處。 –

+0

「我沒有試圖用併發事件創建單個Observable流。」其實你正在努力做到這一點。由於發佈/連接,您只有一個流將從訂閱調用onNext。 onNext調用是序列化的。讓我們看看Subscriber接口:void onNext(String s)。您應該爲每個訂閱創建一個新的observable,它將在flatMap中調用您的操作並對其進行訂閱並訂閱該特定訂閱。 –

0

flatMapflatMap內會導致相同的:

爲什麼不從用戶移動「異步」 -workload到flatMap運營商和訂閱新的可觀測輸出爲你的。

onNext()總是以順序方式調用,因此使用doOnNext之後flatMap也不適用於您。由於同樣的原因,在最後的subscribe內寫入行動在你的情況下不起作用。

以下代碼是使用RxJava2編寫的。在RxJava版本1中,您需要在Thread.sleep附近添加try-catch塊。

ConnectableObservable<String> e = Observable.just("1", "2", "3").publish(); 

e.flatMap(
     s -> Observable.just(s) 
      .subscribeOn(Schedulers.newThread()) 
      .doOnNext(i -> { // <<<<<< 
       Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); 
       System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i); 
      })) 
    .subscribe(); 

e.flatMap(
     s -> Observable.just(s) 
      .subscribeOn(Schedulers.newThread()) 
      .doOnNext(i -> { // <<<<<< 
       Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); 
       System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i); 
      })) 
    .subscribe(); 

e.connect();