Java 8中的默認「paralellStream()」使用常見的ForkJoinPool
,這可能是延遲問題,如果在提交任務時公用池線程耗盡。但是,在許多情況下,CPU功率足夠,任務時間足夠短,所以這不成問題。如果我們確實有一些長期運行的任務,這當然需要仔細考慮,但是對於這個問題,我們假設這不是問題。在Java8 parallelStream()中使用I/O + ManagedBlocker有什麼問題嗎?
但是,填充ForkJoinPool
的I/O任務實際上並沒有執行任何CPU綁定工作,這是一種引入瓶頸的方法,即使有足夠的CPU功率可用。 I understood that。不過那就是我們的ManagedBlocker
。因此,如果我們有I/O任務,我們應該只允許ForkJoinPool
在ManagedBlocker
內管理這個任務。這聽起來非常簡單。但令我驚訝的是,使用ManagedBlocker
是一件相當複雜的API,因爲它很簡單。畢竟,我認爲這是一個普遍的問題。所以,我剛剛建立了一個簡單實用的方法,使ManagedBlocker
很容易爲普通情況下使用:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
現在,如果我想下載了幾個使用並聯網站的HTML代碼,我可以這樣給它不在I/O造成任何麻煩:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
我感到有點驚訝的是有像BlockingTasks
以上的Java運(?或者我沒找到它)沒有階級,但它並不難建立。
當我谷歌的「Java的8位並行數據流:」我在第一時間拿到四個結果那些聲稱,由於I/O問題叉/加入吸在Java中的文章:
- https://dzone.com/articles/think-twice-using-java-8
- http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/(至少提到
ManagedBlocker
也說「在不同使用情況下,你可以給它一個ManagedBlocker實例」。它沒有提到爲什麼在這種情況下。
我已經改變了我的搜索條件,雖然有很多人抱怨生活有多可怕,但我沒有發現任何人談論上述解決方案。由於我不喜歡馬文(大腦像行星)和Java 8可用一段時間,我懷疑我在那裏提出的東西有一些可怕的錯誤。
我撞在一起的小測試:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
我跑了一個得到以下結果:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
所以,我的8個CPU的計算機上ForkJoinPool
自然會選擇8個線程,完成了第一次8個任務,最後是最後兩個任務,這意味着這需要20秒,如果有其他任務排隊,池可能仍然沒有使用閒置的CPU(除了最近10秒內的6個內核)。
然後我用...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
...而不是...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
...,得到了以下結果:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
在我看來是這樣工作的,額外的線程被啓動,以彌補我的模擬「阻塞I/O行動」(睡眠)。時間減少到10秒,我想如果我排隊更多的任務,那些仍然可以使用可用的CPU功率。
如果I/O操作被封裝在ManagedBlock
中,此解決方案或通常在流中使用I/O是否有任何問題?
分解的好處。這當然會限制單個任務可能花費的時間,但是如果隊列中有足夠的其他計算任務,它將不會限制總吞吐量。結論:如果只有吞吐量是個問題,那麼解決方案很好。如果單個I/O任務的響應時間是一個問題,並且所涉及的單個I/O任務可以在更多步驟中分解,則應考慮其他解決方案。 – yankee
不要忘記:Stream API使用Fork/Join是一個實現細節。只要Streams不保證使用該框架,就不能保證使用'ManagedBlocker'將會提高併發性...... – Holger