2015-10-20 50 views
0

我試圖實現與RxJava以下邏輯:執行請求或訂閱結果與RxJava

  1. 執行服務器發出請求,如果我沒有價值本地
  2. 結果提供給用戶
  3. 如果請求正在運行,請不要創建第二個請求,但請訂閱運行請求的結果。

下面的解決方案部分地解決了這個問題:

private final ExecutorService executor = Executors.newSingleThreadExecutor(); 

private Observable<T> getValue(){ 
    if(storage.getValue() == null) { 
     Future<T> futureValue = executor.submit(new Callable<T>() { 
      @Override 
      public T call() throws Exception { 
       return getValueFromStorageOrBackend(); 
      } 
     }); 
     return Observable.from(futureValue); 
    } else { 
     return Observable.just(storage.getValue()); 
    } 
} 

private String getValueFromStorageOrBackend() { 
    final String currentValue = storage.getValue(); 
    if (currentValue == null) { 
     Response response = backend.requestValue(); 
     storage.setValue(response.getValue()); 

     return response.getValue(); 
    } 

    return currentValue; 
} 

對此有一個優雅的純RxJava解決方案?

回答

1

其中一個簡單的方法來做到這一點是通過AsyncSubject +一個比較並設置循環:

final AtomicReference<AsyncSubject<T>> subjectRef = new AtomicReference<>(); 

public Observable<T> get() { 
    for (;;) { 
     AsyncSubject<T> subject = subjectRef.get(); 
     if (subject != null) { 
      return subject; 
     } 
     AsyncSubject<T> next = AsyncSubject.create(); 
     if (subjectRef.compareAndSet(null, next)) { 
      Observable.just(1).map(ignored -> { 
       // your computation here 
       return 2; // the result 
      }).subscribeOn(Schedulers.io()).subscribe(subject); 
      return; 
     } 
    } 
} 

我差點忘了,一個更簡單的方法是使用cache

Observable 
.just(1) 
.subscribeOn(Schedulers.io()) 
.map(ignored -> { 
    // computation here 
    return 2; 
}).cache(); 
+0

在'只是(1)'我會返回我的儲值。在map()中,我將檢查這個值是否爲null,並在需要時執行請求。這是對的嗎?什麼保證請求只執行一次? – Zzokk

+0

爲什麼在第一個例子中有一個無限循環? – Zzokk

+0

這是一個CAS循環,以防併發調用者得到()並且只有一個人應該觸發後臺計算。這裏有一個'返回'。由於狀態改變,循環在退出前最多執行兩次。 – akarnokd