2

掛我做什麼:可觀察ZIP運營商在使用vertx HTTP客戶端

我使用vertx RX HTTP客戶端進行了大量的HTTP請求。在這個特定的情況下,我打電話給「方法A」,它返回一個ID列表。接收我需要調用方法A幾次以獲得下一批結果的所有ID。 (每次指定我想要接收的不同頁碼)

爲了提高性能並儘可能地並行調用,我創建了一個(RxJava)Observables項的列表,每個項表示一個單獨的結果頁面請求。當我完成創建這個列表時,我調用了Obserable.zip運算符,並通過了observable列表。

的問題:

使用vertx HTTP客戶端無需特別設置,一切正常,而是非常緩慢。例如在5分鐘內處理3000個http請求。

我試圖通過設置vertx HTTP客戶端選項如下,以提高性能:

HttpClientOptions options = new HttpClientOptions(); 

options.setMaxPoolSize(50) 
     .setKeepAlive(true) 
     .setPipelining(true) 
     .setTcpKeepAlive(true) 
     .setPipeliningLimit(25) 
     .setMaxWaitQueueSize(10000); 

但是當我做,我得到不穩定的結果:有時候一切正常,我能夠接收所有響應少於20秒。但是,有時外部服務器我所有呼叫關閉連接,日誌顯示以下錯誤:

io.vertx.core.http.impl.HttpClientRequestImpl 
SEVERE: io.vertx.core.VertxException: Connection was closed 
  • 我的代碼中沒有錯誤處理程序被稱爲
  • 當這個錯誤出現的zip操作掛起

這裏是一個創建HttpClientRequest代碼

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) { 
     Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> { 
      try { 
       HttpClientRequest request = httpClient.postAbs(url); 
       addHeadersToRequest(headers, request); 
       sendRequest(url, subscriber, request, body); 
      }catch (Exception e) { 
       try { 
        subscriber.onError(e); 
       }catch (Exception ex) { 
        logger.error("error calling onError for subscriber",ex); 
       }finally { 
        subscriber.onCompleted(); 
       } 
      } 
     }); 
     return bufferObservable; 
    } 

private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) { 
     final long requestId = reqNumber.getAndIncrement(); 

     if (bodyData != null) { 
      request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length); 
     } 

     request.putHeader("Accept-Encoding", "gzip,deflate"); 

     Observable<HttpRestResponse> retVal = request.toObservable() 
       .doOnError(throwable -> { 
        logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage()); 
       }).doOnNext(response -> { 
        if (response != null) { 
         logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl); 
        } 
       }).flatMap(httpClientResponse -> { 
        try { 
         if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) { 
          Observable<Buffer> bufferObservable = httpClientResponse.toObservable() 
            .reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer)); 

          return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse))); 
         } 
        } catch (Exception e) { 
         logger.error("error in RestHttpClient", e); 
        } 

        return Observable.just(new HttpRestResponse(null, httpClientResponse)); 
       }); 

     retVal.subscribe(subscriber); 

     if (bodyData != null) { 
      request.end(bodyData); // write post data 
     } else { 
      request.end(); 
     } 
    } 

asdasdasd

+0

你可以發佈你創建HttpClientRequest的代碼嗎? – JustDanyul

+0

我更新了上面的帖子以包含請求創建以及如何使用它 –

回答

0

如果你認爲你可以連接你的異常的邏輯是這樣

try { 
      HttpClientRequest request = httpClient.postAbs(url); 
      addHeadersToRequest(headers, request); 
      sendRequest(url, subscriber, request, body); 
     }catch (Exception e) { 
      try { 
       subscriber.onError(e); 
      }catch (Exception ex) { 
       logger.error("error calling onError for subscriber",ex); 
      }finally { 
       subscriber.onCompleted(); 
      } 
     } 

您將不會收到任何錯誤,因爲基本上所有的處理現在的Rx生態居住,因此不會有任何報告在你的try catch塊中給你。

從這個時間點的誤差會從你的

bufferObservable.onErrorReturn() 

bufferObservable.subscribe(success, error) 
0

所以最後我想通了這一點可以向你走來。看來我通過Observable.zip方法的空列表...

這裏的問題是,onNext或onError的沒有得到被稱爲「拉鍊」方法的返回觀察的對象。在這樣的情況下,只有的onComplete叫我沒有刻意去創造......

非常感謝大家誰很感興趣,並試圖幫助處理程序。

Yaniv