2017-02-21 410 views
4

下面的示例代碼我正在注入一個biconsumer睡眠100毫米作爲一套完整的未來的完成行動。我已經使用whenCompleteAsync方法,通過單獨使用executorServiceexecutorServiceThreadPoolExecutor與芯池大小5,最大尺寸和5 1.如何捕獲CompletableFuture的whenCompleteAsync調用中拋出的RejectedExecutionException?

public class CompleteTest { 
    public static void main(String[] args) { 
     ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10, 
       TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); 

     ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 

     for (int i = 0; i <100; i++) { 
      CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
      stringCompletableFuture.whenCompleteAsync((e, a) -> { 
       System.out.println("Complete " + e); 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) {e1.printStackTrace();} 
      }, executorService); 

      list.add(stringCompletableFuture); 
     } 

     for (int i = 0; i < list.size(); i++) { 
      list.get(i).complete(i + ""); 
     } 
    } 
} 

當我跑的代碼的隊列長度,即使我完成100個期貨僅6輸出得到打印。這是5個核心線程和1個排隊的線程。剩下的事發生了什麼?如果由於隊列已滿而導致其他可運行列表無法提交給執行程序服務,則不應該出現異常。

輸出

Complete 0 
Complete 1 
Complete 2 
Complete 3 
Complete 4 
Complete 5 

回答

5

拋出一個異常,並且CompletableFuture分外完成,只是那些不是任何你追蹤。

您正在使用構造函數實例化並初始化ThreadPoolExecutor,該構造函數使用僅引發異常的默認RejectedExecutionHandler。我們知道如果ExecutorService無法接受任務,則會拋出RejectedExecutionException。那麼添加的任務在哪裏以及拋出的異常在哪裏?

就目前而言,所有的鏈接發生在whenCompleteAsync之內。當你打電話給你的時候,你在接收者CompletableFuturestringCompletableFuture上添加一個依賴關係。當stringCompletableFuture完成(在這種情況下成功)時,它將創建一個新的CompletableFuture(它返回)並嘗試安排給定的ExecutorService上給定的BiConsumer

由於ExecutorService的隊列沒有空間,它會調用RejectedExecutionHandler,這將拋出RejectedExecutionException。該例外情況在當時被捕獲,並用於將被退回。

換句話說,在您的for循環中,捕獲由whenCompleteAsync返回的CompletableFuture,將其存儲並打印出其狀態。

ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>(); 
for (int i = 0; i <100; i++) { 
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> { 
     System.out.println("Complete " + e); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e1) {e1.printStackTrace();} 
    }, executorService); 
    dependents.add(thisWillHaveException); 
    list.add(stringCompletableFuture); 
} 

for (int i = 0; i < list.size(); i++) { 
    list.get(i).complete(i + ""); 
} 
Thread.sleep(2000); 
dependents.forEach(cf -> { 
    cf.whenComplete((r, e) -> { 
     if (e != null) 
      System.out.println(cf + " " + e.getMessage()); 
    }); 
}); 

你會發現,他們都是(除了已成功先前打印的6)用RejectedExecutionException格外完成。

... 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
相關問題