1

我有以下代碼:如何防止在上下文線程CompletableFuture#whenComplete執行

ConcurrentHashMap taskMap= new ConcurrentHashMap(); 
.... 
taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenComplete((r, e) -> taskMap.remove(key, future));    
     return future; 
    }); 

這個代碼在future已經完成whenComplete函數參數的情況下調用在同一線程compute所調用的問題。在這個方法的主體中,我們從地圖中刪除條目。但計算方法文檔禁止此應用程序凍結。

我該如何解決這個問題?

回答

0

最明顯的解決方案是使用whenCompleteAsync而不是whenComplete,因爲前者保證使用提供的Executor執行操作而不是調用線程。可與

Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); }; 
for(int run = 0; run<2; run++) { 
    boolean completed = run==0; 
    System.out.println("*** "+(completed? "with already completed": "with async")); 
    CompletableFuture<String> source = completed? 
     CompletableFuture.completedFuture("created in "+Thread.currentThread()): 
     CompletableFuture.supplyAsync(() -> { 
      LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); 
      return "created in "+Thread.currentThread(); 
     }, ex); 

    source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex) 
      .whenCompleteAsync((s,t) -> { 
       if(t!=null) t.printStackTrace(); else System.out.println(s); 
       System.out.println("consumed in "+Thread.currentThread()); 
      }, ex) 
      .join(); 
} 

得到證實,這將打印出類似這樣

*** with already completed 
job scheduled 
job scheduled 
created in Thread[main,5,main] 
processed in Thread[Thread-0,5,main] 
consumed in Thread[Thread-1,5,main] 
*** with async 
job scheduled 
job scheduled 
job scheduled 
created in Thread[Thread-2,5,main] 
processed in Thread[Thread-3,5,main] 
consumed in Thread[Thread-4,5,main] 

所以你可以只使用

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

如果提前完成有顯著的可能性,你可以使用減少開銷

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     if(future.isDone()) future = null; 
     else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

也許,你沒有找到這個明顯的解決方案,因爲你不喜歡依賴操作總是作爲一個新任務被調度到池中,即使完成發生在不同的任務中。你可以用一個專門的執行,將重新安排僅在必要時任務解決這個問題:

Executor inPlace = Runnable::run; 
Thread forbidden = Thread.currentThread(); 
Executor forceBackground 
     = r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r); 

… 

future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground); 

但是你可能會重新考慮這個複雜的每個映射清理邏輯是否真的需要的話。它不僅複雜,而且可能會產生顯着的開銷,可能會安排大量的清理操作,而這些清理操作在執行時已經過時並不是真正需要的。

這可能是更簡單,更有效地從時間執行

taskMap.values().removeIf(CompletableFuture::isDone); 

時間清理整個地圖。