2017-07-18 66 views
2

我使用ormlite並將數據庫數據轉換爲rx Observable。RxJava2,OrmLite - 一個可觀察,多個訂戶 - 爲所有訂戶重複數據

public static <T> Observable<T> createObservable(@NonNull Observable<T> observable) { 
    return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()); 
} 


public Observable<List<Country>> getCountriesObservable() { 
    return RxUtils.createObservable(new Observable<List<Country>>() { 
     @Override 
     protected void subscribeActual(Observer<? super List<Country>> observer) { 
      try { 
       List<Country> list = mCountryDao.getCountries(); 
       observer.onNext(list == null ? Collections.<Country>emptyList() : list); 
       observer.onComplete(); 
      } catch (SQLException exc) { 
       Log.e(TAG, exc.getMessage()); 
       observer.onError(exc); 
      } 
     } 
    }); 
} 

我有三個不同的類A,B,C。在這個類中,我創建了三個觀察者。我贊成觀察。

observable.subscribe(observerA); 
observable.subscribe(observerB); 
observable.subscribe(observerC); 

之後我改變了A類數據並更新了數據庫。 是否有方法可以爲所有觀察者說明可重複的新數據?

或者我需要再次寫這段代碼?

observable.subscribe(observerA); 
observable.subscribe(observerB); 
observable.subscribe(observerC); 

回答

0

聲明:我沒有與RxJava直接經驗,只有Rx.NET,RxJS和RxSwift(我沒有做過在10年以上的Java開發)

首先,我會建議在訂閱之前在觀察值上使用share()運算符。這可以確保你沒有額外的工作:

Observable<List<Country>> sharedObservable = observable.share(); 

sharedObservable.subscribe(observerA); 
sharedObservable.subscribe(observerB); 
sharedObservable.subscribe(observerC); 

可觀察到的是數據流。現在它只包含一個值,但你可以讓它返回多個值。所有用戶當然會收到新的值。

您需要通知數據已更新,然後在發生這種情況時生成新值。任意創建可觀察值的簡單方法是使用Subject。主題是有點「非RX-Y」,但爲了簡單起見,我將在這裏使用一個:

// All this inside the same class 

private PublishSubject<Object> updatesSubject = new PublishSubject<Object>(); 

public void SignalUpdate(){ 
    updatesSubject.onNext(new Object()); 
} 

public Observable<List<Country>> getCountriesObservable() { 
    return updatesSubject 
     .startWith(new Object) 
     .flatMap(_ -> RxUtils.createObservable(new Observable<List<Country>>() { 
      @Override 
      protected void subscribeActual(Observer<? super List<Country>> observer) { 
       try { 
        List<Country> list = mCountryDao.getCountries(); 
        observer.onNext(list == null ? Collections.<Country>emptyList() : list); 
        observer.onComplete(); 
       } catch (SQLException exc) { 
        Log.e(TAG, exc.getMessage()); 
        observer.onError(exc); 
       } 
      } 
    }); 
} 

這將每signalUpdate()被調用時返回List<Country>。爲確保您在第一次訂閱時獲得價值,我們呼籲startWith()立即發出給定值並啓動數據的第一次加載。

+0

其作品完美。謝謝。 –