2016-04-29 59 views
2

我想調用CompletableFuture.supplyAsync()將阻塞任務委託給另一個線程。一旦完成任務,我希望CompletableFuture.thenAccept使用者在調用線程的上下文中運行。從調用線程運行CompletableFuture.thenAccept?

例如:

// Thread 1 

CompletableFuture.supplyAsync(() -> { 
    // Thread 2 

    return BlockingMethod(); 
}).thenAccept((
    Object r) -> { 

    // Thread 1 
}); 

以下代碼表明CompletableFuture.thenAccept運行在其自己的線程;可能是相同的池CompletableFuture.supplyAsync我得到相同的線程ID,當我運行它:

System.out.println("Sync thread supply " + Thread.currentThread().getId()); 

CompletableFuture.supplyAsync(() -> { 

    System.out.println("Async thread " + Thread.currentThread().getId()); 

    try { 
     Thread.sleep(2000); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 

    return true; 
}).thenAccept((
    Boolean r) -> { 

    System.out.println("Sync thread consume " + Thread.currentThread().getId()); 
}); 

Thread.sleep(3000); 

是否有可能同時有與調用線程CompletableFuture.thenAccept運行?

回答

3

CompletableFuture只會執行ConsumerthenAccept註冊接收CompletableFuture(由supplyAsync返回一個)完成時,因爲它需要它與完成值。

如果調用thenAccept時接收方CompletableFuture已完成,則Consumer將在調用線程中執行。否則,它將執行任何線程完成Supplier提交給supplyAsync

是否有可能同時有運行CompletableFuture.thenAccept與 調用線程?

這是一個令人困惑的問題,因爲線程一次只能運行一件事。對於單個線程,沒有併發同時是跨越多個線程的屬性。

如果你想Consumer,直到將來完成了對CompletableFuture調用thenAccept,然後join在同一線程上運行,阻止該線程。然後您可以自己執行Consumer或致電thenAccept爲您執行它。

例如

CompletableFuture<Boolean> receiver = CompletableFuture.supplyAsync(() -> { 
    System.out.println("Async thread " + Thread.currentThread().getId()); 

    try { 
     Thread.sleep(2000); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

    return true; 
}); 

receiver.join(); 
Consumer<Boolean> consumer = (Boolean r) -> { 
    System.out.println("Sync thread consume " + Thread.currentThread().getId()); 
}; 

consumer.accept(receiver.get()); 

(異常處理省略)


如果你想Consumer並行與供給supplyAsyncSupplier運行,這是不可能的。 Consumer意味着消耗Supplier產生的價值。該值在Supplier完成之前不可用。

0

如果我知道你要到餐​​桌的I/O密集型任務,你的想法的權利,但做的所有處理中的「事件循環」(認爲JavaScript)的,那麼你的代碼可以被轉換爲

Executor eventLoop = Executors.newSingleThreadExecutor(); 
Executor ioMultiplexor = Executors.newCachedThreadPool(); 

eventLoop.execute(() -> { 
    System.out.println("event loop thread supply " + Thread.currentThread().getId()); 
    CompletableFuture.supplyAsync(() -> { 
     System.out.println("I/O multiplexor thread " + Thread.currentThread().getId()); 
     try { 
      Thread.sleep(2000); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     return true; 
    }, ioMultiplexor).thenAcceptAsync((Boolean r) -> { 
     System.out.println("event loop thread consume " + Thread.currentThread().getId()); 
    }, eventLoop); 
}); 

Thread.sleep(3000); 
// shut down executors 

這將打印

event loop thread supply 10 
I/O multiplexor thread 11 
event loop thread consume 10 

如果此代碼用於一些請求處理,其中可以有很多併發請求,你可能會希望有一個全球eventLoop和一個全局ioMultiplexer,而你也將要釋放的主請求韓德林g線程完成後將任務提交到eventLoop