我試圖用Vert.x做非常簡單的SSH客戶端。由於我沒有非阻塞的SSH庫,所以我必須處理rxExecuteBlocking
中的所有內容。它的工作的偉大,當我運行的所有邏輯代碼的一個大塊如下:方法rxExecuteBlocking消耗所有結果 - ssh客戶端
public Single<String> exec() {
return vertx.rxExecuteBlocking(f -> {
String result = "";
// connect()
// exec()
// close()
f.complete(result);
}, false);
}
// hostnames :: Observalbe<String>
hostnames()
.filter()
.flatMapSingle(this::exec)
.moreCalls()
.subscribe(); // OK
我寧願有connect()
,exec()
,close()
separeted和調用諸如:
hostnames()
.filter()
.flatMapSingle(this::connect)
.moreCalls()
.flatMapSingle(this::exec)
.moreCalls()
.flatMapSingle(this::close)
.subscribe();
但
public Single<Connection> connect() {
return vertx.rxExecuteBlocking(f -> {
// connect
}, false);
}
public Single<Connection> exec() {
return vertx.rxExecuteBlocking(f -> {
// exec
}, false);
}
鏈停止在flatMapSingle(this::connect)
運行多個片阻斷代碼的時,消耗來自的所有結果先創建(建立所有連接),然後繼續連鎖。由於所有連接都在內存中,此行爲會消耗相當多的資源(此行爲會提醒我reduce()
或collect()
)
所需結果將不會停止在鏈中並繼續,釋放資源並對每個事件執行此操作。
有沒有辦法做到這一點?
在此先感謝。
看起來像你的工作者池大小(默認值爲20)接近連接數量,因此預計,多個並行'連接'消耗所有可用的線程和鏈不會繼續,直到某些線程被釋放 – zella
感謝您的建議!好點,我試過增加游泳池的大小,並沒有使用太多的連接,但它沒有幫助。 –