2017-08-08 58 views
1

我試圖用Vert.x做非常簡單的SSH客戶端。由於我沒有非阻塞的SSH庫,所以我必須處理rxExecuteBlocking中的所有內容。它的工作的偉大,當我運行的所有邏輯代碼的一個大塊如下:方法rxExecuteBlocking消耗所有結果 - ssh客戶端

public Single<String> exec() { 
    return vertx.rxExecuteBlocking(f -> { 
     String result = ""; 

     // connect() 
     // exec() 
     // close() 

     f.complete(result); 
    }, false); 
} 

// hostnames :: Observalbe<String> 
hostnames() 
    .filter() 
    .flatMapSingle(this::exec) 
    .moreCalls() 
    .subscribe(); // OK 

我寧願有connect()exec()close() separeted和調用諸如:

hostnames() 
    .filter() 
    .flatMapSingle(this::connect) 
    .moreCalls() 
    .flatMapSingle(this::exec) 
    .moreCalls() 
    .flatMapSingle(this::close) 
    .subscribe(); 

public Single<Connection> connect() { 
    return vertx.rxExecuteBlocking(f -> { 
    // connect 
    }, false); 
} 

public Single<Connection> exec() { 
    return vertx.rxExecuteBlocking(f -> { 
    // exec 
    }, false); 
} 

鏈停止在flatMapSingle(this::connect)運行多個片阻斷代碼的時,消耗來自的所有結果先創建(建立所有連接),然後繼續連鎖。由於所有連接都在內存中,此行爲會消耗相當多的資源(此行爲會提醒我reduce()collect()

所需結果將不會停止在鏈中並繼續,釋放資源並對每個事件執行此操作。

有沒有辦法做到這一點?

在此先感謝。

+0

看起來像你的工作者池大小(默認值爲20)接近連接數量,因此預計,多個並行'連接'消耗所有可用的線程和鏈不會繼續,直到某些線程被釋放 – zella

+0

感謝您的建議!好點,我試過增加游泳池的大小,並沒有使用太多的連接,但它沒有幫助。 –

回答

0

我會建議嘗試使用重載的flatMap,它將在特定管道階段的最大併發訂閱觀察值數作爲參數。假設默認情況下工作線程池中有20個線程,那麼可以給每個flatMap調用分配一部分池。 5到每個。

hostnames() 
    // ...some filtering 
    .flatMap(hostname -> this.connect(hostname).toObservable(), 5) 
    // ...more operators 
    .flatMap(connection -> this.exec(connection).toObservable(), 5) 
    // ...more operators 
    .flatMap(connection -> this.close(connection).toObservable(), 5) 
    .subscribe(); 

這將確保不是整個線程池在同一時刻使用。

可能需要對併發負載進行一些調整。例如,如果connectexec快,則connectexec的更少同時訂閱的可觀察量。因此,在exec之前,connect的結果不會堆積在緩衝區中。