2016-06-13 63 views
0

我想在後臺線程中將對象映射/轉換爲另一個對象,並在完成單個對話後立即將其放入主線程中。RxJava地圖並在完成單個地圖後立即發射

Observable.just(1,2,3,4,5) 
      .map(new Func1<Integer, String>() { 
       @Override 
       public String call(Integer integer) { 
        Log.d(TAG, "mapping number " + integer); 
        return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
       } 
      }) 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Observer<String>() { 
       @Override 
       public void onCompleted() { 
        Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onError(Throwable e) { 
       } 

       @Override 
       public void onNext(String integer) { 
        Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       } 
      }); 

結果是:

D: mapping number 1 
D: mapping number 2 
D: mapping number 3 
D: mapping number 4 
D: mapping number 5 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 

但是轉換可能花了一段時間,我希望儘快轉換完成後接收它們。

D: mapping number 1 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 2 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 3 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 4 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 5 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 
+3

你的問題是什麼? RxJava會像你的描述一樣。您問題中的日誌僅僅是因爲轉換太快。在map函數中添加'Thread.sleep(1000)',你會看到不同的日誌。 – zsxwing

+0

@zsxwing工作。 – Pedram

回答

3

沒有必要設置全局緩衝區的大小,只需用observeOn(Scheduler, int)重載,您可以指定預取值爲1.如果先前的值已處理,則只會請求下一個值。

0

這是由於RxJava對上面使用的鏈中的操作員施加反壓。諸如ObserveOn這樣的下游運營商通過塊向上遊請求數據,而不是通過單個項目提高效率。如果你設置緩衝區的大小爲一,這將有效地實現,你會與效率的成本會發生什麼:

-Drx.ring-buffer.size=1 

具體來說,這將是對於具有昂貴的往返調用上行流相當可怕的。

編輯

您可以使用BehaviorSubject拉鍊進行排序的序列化的向下和向上流排放:

BehaviorSubject<Void> signal = BehaviorSubject.create(); 
signal.onNext(null); // <- pair up the signal with the first item immediately 
Observable.just(1,2,3,4,5) 
     .zipWith(signal, (item,v)->item) //only emit a next item when there is a "receipt acknowledgement" from the down stream 
     .observeOn(Schedulers.newThread()) //<- needed to avoid fetching subsequent items in UI thread 
     .map(new Func1<Integer, String>() { 
      @Override 
      public String call(Integer integer) { 
       Log.d(TAG, "mapping number " + integer); 
       return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
      } 
     }) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Observer<String>() { 
      @Override 
      public void onCompleted() { 
       Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
      } 

      @Override 
      public void onError(Throwable e) { 
      } 

      @Override 
      public void onNext(String integer) { 
       Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       signal.onNext(null); //<- acknowledge receipt - allow emitting next item from upstream 
      } 
     });