2017-04-06 120 views
0

這裏以後就是我想實現:合併PublishSubject訂閱

PublishSubject<Integer> subject = PublishSubject.create(); 
    subject.subscribe(new Consumer<Integer>() { 
     @Override 
     public void accept(@NonNull Integer integer) throws Exception { 
      System.out.println("item = " + integer); 
     } 
    }); 
    subject.mergeWith(Observable.just(1, 2)); 
    subject.onNext(3); 
    /* 
    expected: 
    item = 1 
    item = 2 
    item = 3 
    but received : 
    item = 3 
    */ 

我知道我可以做這樣的事情:

PublishSubject.merge(subject, Observable.just(1,2)).subscribe(new Consumer<Integer>() { 
     @Override 
     public void accept(@NonNull Integer integer) throws Exception { 
      System.out.println("item = " + integer); // emits 1 2 3 
     } 
    }); 

但問題是用戶已經訂閱主題。 我找不到一個優雅的方式。

編輯: 作爲研究對象都是觀察者和用戶,你可以做這樣的事情:

final PublishSubject<Integer> subject = PublishSubject.create(); 
    subject.subscribe(new Consumer<Integer>() { 
     @Override 
     public void accept(@NonNull Integer integer) throws Exception { 
      System.out.println("item = " + integer); 
     } 
    }); 
    Observable.just(1,2).subscribe(subject); 
    subject.onNext(3);//subscription 
    /* 
    expected: 
    item = 1 
    item = 2 
    item = 3 
    but received : 
    item = 1 
    item = 2 
    */ 
+0

這不是什麼['ReplaySubject'](http://reactivex.io/RxJava/javadoc/rx/subjects/ReplaySubject.html)是爲了什麼? – jensgram

+1

@jensgram no。事情是subject.mergeWith(Observable.just(1,2)); 返回一個新的Observable/Subject,您不會將Observable合併到原始的Observable中。 –

+0

@PhoenixWang有道理。感謝澄清。因此,用'1'和'2'的'ReplaySubject'替換'Observable.just(1,2)',那麼? – jensgram

回答

1

更改代碼到

Subject<Integer> subject = PublishSubject.create(); 
    subject.subscribe(new Consumer<Integer>() { 
     @Override 
     public void accept(@NonNull Integer integer) throws Exception { 
      System.out.println("item = " + integer); 
     } 
    }); 
    Observable.just(1,2).doOnNext(new Consumer<Integer>() { 
     @Override 
     public void accept(@NonNull Integer e) throws Exception { 
      subject.onNext(e); 
     } 
    }).subscribe(); 

    subject.onNext(3); 

但仍。你需要訂閱這個Observable。