2017-09-05 597 views
0

我需要一些幫助來實現使用RxJava2的並行異步調用& Retrofit2。 我的要求是;如何使用RxJava2和Retrofit2創建並行多個非阻塞服務請求

1)我有多個保險公司(現在我只需要兩個),我需要發送多個使用該保險公司名稱的並行請求。

2)如果它們中的任何一個給服務器錯誤,那麼其餘的請求不應該被阻塞。

以下是我到現在爲止所嘗試的;

ArrayList<String> arrInsurer = new ArrayList<>(); 
     arrInsurer.add(AppConstant.HDFC); 
     arrInsurer.add(AppConstant.ITGI); 

     RequestInterface service = getService(ServiceAPI.CAR_BASE_URL); 
     for (String insurerName : arrInsurer) { 
      service.viewQuote(Utils.getPrefQuoteId(QuoteListActivity.this), insurerName) 
        .subscribeOn(Schedulers.computation()) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Consumer<ViewQuoteResDTO>() { 
         @Override 
         public void accept(@NonNull ViewQuoteResDTO viewQuoteResDTO) throws Exception { 
          Log.e("Demo", viewQuoteResDTO.getPremiumData().getIDV()+""); 
          updateList(); 
         } 
        }, new Consumer<Throwable>() { 
         @Override 
         public void accept(@NonNull Throwable throwable) throws Exception { 
          Log.e("Demo", throwable.getMessage()); 
         } 
        }); 
     } 

private RequestInterface getService(String baseUrl) {  
    Gson gson = new GsonBuilder() 
      .setLenient() 
      .create(); 

    return new Retrofit.Builder() 
      .baseUrl(baseUrl)     
      .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create(gson)) 
      .build().create(RequestInterface.class); 

} 

現在,上面的代碼只有在兩個請求都能成功響應的情況下才能正常工作。但是當任何請求作爲內部服務器錯誤發出響應時,請求的其餘部分也會被阻塞。

下面的任何一個請求給出的日誌錯誤我得到失敗響應;

E/Demo: HTTP 500 Aww Snap, Some thing happened at server. Please try back again later. 
E/Demo: unexpected end of stream on Connection{100.xxx.xxx.xx:portNo, [email protected] hostAddress=/100.xxx.xxx.xx:portNo cipherSuite=none protocol=http/1.1} 

如何處理這個錯誤?

回答

1

我想像任何其他Rx相關的問題,這有多個答案。我會在我們的應用程序中使用我的應用程序,並解決這個用例。希望能幫助到你。

短版本 - 這依賴於mergeDelayError。檢查出here

爲什麼merge?因爲與concat不同,它將並行執行觀察值。爲什麼mergeDelayError?它延遲了錯誤...本質上它會執行每個可觀察的事物並在所有事情完成時傳遞錯誤。這可以確保即使出現一個或多個錯誤,其他錯誤仍會被執行。

你必須小心一些細節。事件順序不再保留,這意味着merge運算符可能會交織一些可觀察事件(鑑於您以前如何做事,這不應該成爲問題)。據我所知,即使多個可疑事件失敗,你只會得到一個onError的電話。如果這兩個都ok,那麼你可以嘗試以下方法:

List<Observable<ViewQuoteResDTO>> observables = new ArrayList<>(); 
for (String insurerName : arrInsurer) {  
    observables.add(service.viewQuote(
     Utils.getPrefQuoteId(QuoteListActivity.this), insurerName)); 
} 

Observable.mergeDelayError(observables) 
      .subscribeOn(Schedulers.computation()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(/* subscriber calls if you need them */); 

的想法是創建一個你要運行,然後使用mergeDelayError觸發所有這些觀測。