2014-11-02 73 views
2

幫助撰寫多個網絡電話並在Rxjava中累積結果。 (我正在使用的Android應用程序。)撰寫多個網絡電話RxJava - Android

State 
-- List<City> cityList; 

City 
- cityId; 

RestCall 1 
Observable<State> stateRequest = restService.getStates(); 

RestCall 2 
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId); 

在UI我有一種讓每個城市的所有細節後顯示的城市列表,然後顯示在列表視圖。 我如何實現parllel網絡調用並累積結果。 ?

我想把所有的城市細節結果放在源狀態「對象」的列表中。由於國家對象也有一些信息需要進行分析。這是可能的嗎?

stateRequest ??? 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Subscriber<State>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(State result) { 
    // Get city list and display 
    } 
}); 

我檢查了這個例子,它顯示了我們如何能更多地拉一個可觀察的響應。下面的代碼片段顯示了3個可觀察組合。 但在我的情況下,我不得不做20個網絡電話並行或順序(我的意思是在後臺,但一個接一個)。我如何做到這一點。任何幫助或指示?

https://gist.github.com/skehlet/9418379

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() { 
    @Override 
    public Map<String, String> call(String s, Integer integer, Integer integer2) { 
     Map<String, String> map = new HashMap<String, String>(); 
     map.put("f3", s); 
     map.put("f4", String.valueOf(integer)); 
     map.put("f5", String.valueOf(integer2)); 
     return map; 
    } 

回答

3

我覺得你的代碼可以簡化爲這樣的事情,作爲您使用zip運營商的接近使用toList運營商的

stateRequest 
.subscribe(State state -> { 
    Observable.from(state.getCityList()) 
       .flatMap(City city -> restService.getCityDetail(city.getId()) 
       .toList() 
       .subscribe(List<City> cities -> { 

        state.clear(); 
        state.addAll(cities); 
       }); 
    }); 

由於RxJava不提供throttle運營商,你可能會構建類似如下的東西:

Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c); 

使用此限制器是一種可觀察的事物,它將每秒發射一座城市。

所以,用你的代碼,如果你想呼叫限制getCityDetail例如:

Observable<Object> limiter = Observable.interval(1, SECONDS); 
stateRequest 
.subscribe(State state -> { 
    Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c) 
       .flatMap(City city -> restService.getCityDetail(city.getId()) 
       .toList() 
       .subscribe(List<City> cities -> { 

        state.clear(); 
        state.addAll(cities); 
       }); 
    }); 
+0

間隔郵編是一個很好的方法。爲我工作。接受你的答案。 – Mani 2014-11-03 13:48:37

1
stateRequest 
.flatMap(new Func1<State, Observable<State>>() { 
    @Override 
    public Observable<State> call(final State state) { 

     List<Observable> cityObservablesList = new ArrayList<Observable>(); 

     for(City city: state.getCityList()) { 
      cityObservablesList.add(restService.getCityDetail(city.getId()); 
     } 

     Observable cityObservables = Observable.from(cityObservablesList); 
     return Observables.zip(cityObservables, new FuncN<State>() { 
      @Override 
      public State call(Object... args) { 
       List<City> cityList = state.getCityList(); 
       cityList.clear(); 
       for(Object object: args) { 
        cityList.add((City)object); 
       } 

       return state; 
      } 
     }) 
    }) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Subscriber<State>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(State result) { 
    // Get city list and display 
    } 
}); 

我把它帶拉鍊運營商和可迭代的幫助城市列表作爲第一個參數工作。 但我面臨另一個問題。由於zip以並行方式執行作業,所以10-15個網絡調用並行執行,服務器以最大每秒查詢錯誤(QPS-403)拒絕。 我如何指導zip運算符一個接一個地執行任務?

我確實通過向城市可觀測值添加延遲[delay(c * 200,TimeUnit.MILLISECONDS))]來解決此問題。但似乎不是一個合適的解決方案。

有沒有建議嗎?

+0

我的建議是要問這是一個新問題。不過,'zip'不應該並行執行這些項目--Rx具有行爲保證。我會看看你在'for'循環中對'getCityDetail'的調用。這是什麼原因造成了這個問題? – Enigmativity 2014-11-02 23:00:15

0

看看flatMap(功能..,雙功能..)。也許這就是你需要的。

statesRepository.getStates() 
    .flatMap(states-> Observable.fromIterable(states)) 
    .flatMap(
      state-> cityRepository.getStateCities(state), 
      (state, cityList) -> { 
       state.setCities(cityList); 
       return state; 
      }) 
    .subscribe(state-> showStateWithCity(state));