2016-05-29 458 views
10

Java 8中的默認「paralellStream()」使用常見的ForkJoinPool,這可能是延遲問題,如果在提交任務時公用池線程耗盡。但是,在許多情況下,CPU功率足夠,任務時間足夠短,所以這不成問題。如果我們確實有一些長期運行的任務,這當然需要仔細考慮,但是對於這個問題,我們假設這不是問題。在Java8 parallelStream()中使用I/O + ManagedBlocker有什麼問題嗎?

但是,填充ForkJoinPool的I/O任務實際上並沒有執行任何CPU綁定工作,這是一種引入瓶頸的方法,即使有足夠的CPU功率可用。 I understood that。不過那就是我們的ManagedBlocker。因此,如果我們有I/O任務,我們應該只允許ForkJoinPoolManagedBlocker內管理這個任務。這聽起來非常簡單。但令我驚訝的是,使用ManagedBlocker是一件相當複雜的API,因爲它很簡單。畢竟,我認爲這是一個普遍的問題。所以,我剛剛建立了一個簡單實用的方法,使ManagedBlocker很容易爲普通情況下使用:

public class BlockingTasks { 

    public static<T> T callInManagedBlock(final Supplier<T> supplier) { 
     final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier); 
     try { 
      ForkJoinPool.managedBlock(managedBlock); 
     } catch (InterruptedException e) { 
      throw new Error(e); 
     } 
     return managedBlock.getResult(); 
    } 

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker { 
     private final Supplier<T> supplier; 
     private T result; 
     private boolean done = false; 

     private SupplierManagedBlock(final Supplier<T> supplier) { 
      this.supplier = supplier; 
     } 

     @Override 
     public boolean block() { 
      result = supplier.get(); 
      done = true; 
      return true; 
     } 

     @Override 
     public boolean isReleasable() { 
      return done; 
     } 

     public T getResult() { 
      return result; 
     } 
    } 
} 

現在,如果我想下載了幾個使用並聯網站的HTML代碼,我可以這樣給它不在I/O造成任何麻煩:

public static void main(String[] args) { 
    final List<String> pagesHtml = Stream 
     .of("https://google.com", "https://stackoverflow.com", "...") 
     .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url))) 
     .collect(Collectors.toList()); 
} 

我感到有點驚訝的是有像BlockingTasks以上的Java運(?或者我沒找到它)沒有階級,但它並不難建立。

當我谷歌的「Java的8位並行數據流:」我在第一時間拿到四個結果那些聲稱,由於I/O問題叉/加入吸在Java中的文章:

我已經改變了我的搜索條件,雖然有很多人抱怨生活有多可怕,但我沒有發現任何人談論上述解決方案。由於我不喜歡馬文(大腦像行星)和Java 8可用一段時間,我懷疑我在那裏提出的東西有一些可怕的錯誤。

我撞在一起的小測試:

public static void main(String[] args) { 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start"); 
    IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End"); 
} 

public static void sleep() { 
    try { 
     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName()); 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     throw new Error(e); 
    } 
} 

我跑了一個得到以下結果:

18:41:29.021: Start 
18:41:29.033: Sleeping main 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7 
18:41:39.034: Sleeping main 
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:49.035: End 

所以,我的8個CPU的計算機上ForkJoinPool自然會選擇8個線程,完成了第一次8個任務,最後是最後兩個任務,這意味着這需要20秒,如果有其他任務排隊,池可能仍然沒有使用閒置的CPU(除了最近10秒內的6個內核)。

然後我用...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; })); 

...而不是...

IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 

...,得到了以下結果:

18:44:10.93: Start 
18:44:10.945: Sleeping main 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3 
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11 
18:44:20.957: End 

在我看來是這樣工作的,額外的線程被啓動,以彌補我的模擬「阻塞I/O行動」(睡眠)。時間減少到10秒,我想如果我排隊更多的任務,那些仍然可以使用可用的CPU功率。

如果I/O操作被封裝在ManagedBlock中,此解決方案或通常在流中使用I/O是否有任何問題?

回答

6

總之,是的,您的解決方案存在一些問題。它明顯改善了在並行流中使用阻塞代碼的能力,並且一些第三方庫提供了類似的解決方案(例如,請參見jOOλ庫中的Blocking類)。但是,此解決方案不會更改Stream API中使用的內部分割策略。通過流API創建的子任務的數量由預定的常數AbstractTask類控制:

/** 
* Default target factor of leaf tasks for parallel decomposition. 
* To allow load balancing, we over-partition, currently to approximately 
* four tasks per processor, which enables others to help out 
* if leaf tasks are uneven or some processors are otherwise busy. 
*/ 
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

正如你可以看到它比普通池並行(這是由CPU內核的默認數量)大四倍。真正的分割算法有點棘手,但即使它們全部被阻塞,大致也不會有超過4x-8x的任務。例如,如果您有8個CPU內核,則Thread.sleep()測試將很好地工作到IntStream.range(0, 32)(因爲32 = 8 * 4)。但是對於IntStream.range(0, 64),您將有32個並行任務,每個處理兩個輸入數字,因此整個處理需要20秒,而不是10個。

+0

分解的好處。這當然會限制單個任務可能花費的時間,但是如果隊列中有足夠的其他計算任務,它將不會限制總吞吐量。結論:如果只有吞吐量是個問題,那麼解決方案很好。如果單個I/O任務的響應時間是一個問題,並且所涉及的單個I/O任務可以在更多步驟中分解,則應考慮其他解決方案。 – yankee

+3

不要忘記:Stream API使用Fork/Join是一個實現細節。只要Streams不保證使用該框架,就不能保證使用'ManagedBlocker'將會提高併發性...... – Holger