2017-04-27 145 views
1

我正在查看是否有一種方法可以從Observable同步返回一個緩存值,否則可能需要很長時間才能發出。當然,如果它需要執行它的io /計算,那麼它應該在計算線程上執行它,但是如果它已經在之前完成了,那麼它應該是同步的並且避免在線程之間來回跳轉。下面是我的意思了一些示例代碼:RxJava:同步/立即返回一個緩存的值

public void bind(ItemViewHolder holder) { 
    getCalculationObservable() 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(value -> { 
        holder.showValue(value); // This will happen after bind has finished 
       } 
      }); 
} 

public Observable<Integer> getCalculationObservable() { 
    if (mObservable == null) { 

     mObservable = Observable.fromCallable(this::calculate) 
       .subscribeOn(Schedulers.computation()) 
       .cache(); 

    } 
    return mObservable; 
} 

public int calculate() throws InterruptedException { 
    Thread.sleep(1000); 
    return mValue * 1000; 
} 

編輯:說明什麼我談論:

void onRunSchedulerExampleButtonClicked() throws InterruptedException { 


    Observable<Integer> observable = Observable 
      .fromCallable(this::calculate) 
      .subscribeOn(Schedulers.computation()) 
      .cache(); 

    observable 
      .doOnNext(value -> { 
       Log.e("log", "first onNext()"); 
      }) 
      .test().await(); 

    observable 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnNext(value -> { 
       Log.e("log", "second onNext()"); 
      }) 
      .test().await(); 

    Log.e("log", "this is first."); 

} 

結果:

E/log: onClick 
E/log: first onNext() 
E/log: this is first. 
E/log: second onNext() 

爲了進一步說明這一點,如果你在第二個onNext鏈上添加一個await(),你將永遠不會完成它,因爲它將等待在你阻塞的同一個線程中排隊的東西。

+1

使用'BehaviourSubject'作爲:訂閱客戶主題,並在內部使您的可觀察推送項目進入該主題。這就是你可以如何執行後臺線程和排放(主線程上的緩存/新值)。 – Than

回答

3

更新:

AndroidSchedulers.mainThread()調度應用observeOn,下游事件得到通過postDelayed內部貼到MessageQueue。這就是爲什麼位於第二個Observable之後的代碼在Observable完成之前執行之前(或者如果我們使用test().await(),則凍結)。一種可能的解決方案是使用Subjects作爲您的數據源和訂戶之間的代理。檢查這篇文章的更多信息 - Keep Your Main Thread Synchronous

而且有用的文章:


解釋爲什麼cache不切換線程:

您的Observable已同步返回緩存值,因爲cache沒有訂閱每個訂閱者的整個上游(因此在您的情況下它不會切換線程)。它做了一次,然後只記得項目的順序。對於每個新用戶,cache只是重播它。


實施例: (寫在科特林

//here is the same logic as yours 
private var observable: Observable<Int>? = null 
    get() { 
     if(field==null) 
      field = Observable.fromCallable { 
       System.out.println("callable: execution thread - ${Thread.currentThread().name}") 
       Thread.sleep(1000) 
       [email protected] 1000 
      } 
        .subscribeOn(Schedulers.computation()) 
        .doOnNext  { System.out.println("cached Observable: before cache() - doOnNext execution thread - ${Thread.currentThread().name}") } 
        .doOnComplete { System.out.println("cached Observable: before cache() - doOnComplete execution thread - ${Thread.currentThread().name}") } 
        .cache() 
        .doOnNext  { System.out.println("cached Observable: after cache() - doOnNext execution thread - ${Thread.currentThread().name}") } 
        .doOnComplete { System.out.println("cached Observable: after cache() - doOnComplete execution thread - ${Thread.currentThread().name}") } 

     return field 
    } 

@Test 
fun test() { 
    observable!! 
      .doOnSubscribe { System.out.println("first get: doOnSubscribe execution thread - ${Thread.currentThread().name}") } 
      .doOnNext  { System.out.println("first get: doOnNext execution thread - ${Thread.currentThread().name}") } 
      .doOnComplete { System.out.println("first get: doOnComplete execution thread - ${Thread.currentThread().name}") } 
      .test() 
      .await() 

    System.out.println("---------- first get executed ------------") 

    observable!! 
      .doOnSubscribe { System.out.println("second get: doOnSubscribe execution thread - ${Thread.currentThread().name}") } 
      .doOnNext  { System.out.println("second get: doOnNext execution thread - ${Thread.currentThread().name}") } 
      .doOnComplete { System.out.println("second get: doOnComplete execution thread - ${Thread.currentThread().name}") } 
      .subscribe() 
} 

輸出:

first get: doOnSubscribe execution thread - main 
callable: body execution thread - RxComputationThreadPool-1 
cached Observable: before cache() - doOnNext execution thread - RxComputationThreadPool-1 
cached Observable: after cache() - doOnNext execution thread - RxComputationThreadPool-1 
first get: doOnNext execution thread - RxComputationThreadPool-1 
cached Observable: before cache() - doOnComplete execution thread - RxComputationThreadPool-1 
cached Observable: after cache() - doOnComplete execution thread - RxComputationThreadPool-1 
first get: doOnComplete execution thread - RxComputationThreadPool-1 
---------- first get executed ------------ 
second get: doOnSubscribe execution thread - main 
cached Observable: after cache() - doOnNext execution thread - main 
second get: doOnNext execution thread - main 
cached Observable: after cache() - doOnComplete execution thread - main 
second get: doOnComplete execution thread - main 

正如你可以看到,當存在被緩存的值,線程沒有按不會被切換。

P.S.我假設你使用RxJava2。

+0

其實,它沒有。雖然第二個get會出現在主線程中,但它不會同步返回。相反,它會在循環中排隊並稍後執行。這可以通過第二次獲得後的日誌輕鬆驗證(當然,這裏不能使用await/test)。 –

+0

值得注意的是,一個很大的區別是你的observable不會做observesOn,這似乎會強制排隊。 observesOn(mainThread)基本上是一個要求,因爲不能觸及主線程和所有的視圖。 –

+0

我忽略了observeOn故意顯示當存在緩存值時線程不會切換。我已經編輯了添加大量日誌記錄調用的答案。還有 - 當有第二個獲取 - 計算排程器不涉及。 –