掛我做什麼:可觀察ZIP運營商在使用vertx HTTP客戶端
我使用vertx RX HTTP客戶端進行了大量的HTTP請求。在這個特定的情況下,我打電話給「方法A」,它返回一個ID列表。接收我需要調用方法A幾次以獲得下一批結果的所有ID。 (每次指定我想要接收的不同頁碼)
爲了提高性能並儘可能地並行調用,我創建了一個(RxJava)Observables項的列表,每個項表示一個單獨的結果頁面請求。當我完成創建這個列表時,我調用了Obserable.zip運算符,並通過了observable列表。
的問題:
使用vertx HTTP客戶端無需特別設置,一切正常,而是非常緩慢。例如在5分鐘內處理3000個http請求。
我試圖通過設置vertx HTTP客戶端選項如下,以提高性能:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
但是當我做,我得到不穩定的結果:有時候一切正常,我能夠接收所有響應少於20秒。但是,有時外部服務器我所有呼叫關閉連接,日誌顯示以下錯誤:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
- 我的代碼中沒有錯誤處理程序被稱爲
- 當這個錯誤出現的zip操作掛起
這裏是一個創建HttpClientRequest代碼
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
asdasdasd
你可以發佈你創建HttpClientRequest的代碼嗎? – JustDanyul
我更新了上面的帖子以包含請求創建以及如何使用它 –