最明顯的解決方案是使用whenCompleteAsync
而不是whenComplete
,因爲前者保證使用提供的Executor
執行操作而不是調用線程。可與
Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); };
for(int run = 0; run<2; run++) {
boolean completed = run==0;
System.out.println("*** "+(completed? "with already completed": "with async"));
CompletableFuture<String> source = completed?
CompletableFuture.completedFuture("created in "+Thread.currentThread()):
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return "created in "+Thread.currentThread();
}, ex);
source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex)
.whenCompleteAsync((s,t) -> {
if(t!=null) t.printStackTrace(); else System.out.println(s);
System.out.println("consumed in "+Thread.currentThread());
}, ex)
.join();
}
得到證實,這將打印出類似這樣
*** with already completed
job scheduled
job scheduled
created in Thread[main,5,main]
processed in Thread[Thread-0,5,main]
consumed in Thread[Thread-1,5,main]
*** with async
job scheduled
job scheduled
job scheduled
created in Thread[Thread-2,5,main]
processed in Thread[Thread-3,5,main]
consumed in Thread[Thread-4,5,main]
所以你可以只使用
taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
return future;
});
如果提前完成有顯著的可能性,你可以使用減少開銷
taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
if(future.isDone()) future = null;
else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
return future;
});
也許,你沒有找到這個明顯的解決方案,因爲你不喜歡依賴操作總是作爲一個新任務被調度到池中,即使完成發生在不同的任務中。你可以用一個專門的執行,將重新安排僅在必要時任務解決這個問題:
Executor inPlace = Runnable::run;
Thread forbidden = Thread.currentThread();
Executor forceBackground
= r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r);
…
future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground);
但是你可能會重新考慮這個複雜的每個映射清理邏輯是否真的需要的話。它不僅複雜,而且可能會產生顯着的開銷,可能會安排大量的清理操作,而這些清理操作在執行時已經過時並不是真正需要的。
這可能是更簡單,更有效地從時間執行
taskMap.values().removeIf(CompletableFuture::isDone);
時間清理整個地圖。