2016-01-06 1956 views
1

我有一個測試代碼parallelStream()發送請求到服務器機器。如何減少Java parallelStream中的線程數量?

Report report = 
    requestsList.parallelStream() 
       .map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL)) 
       .map(response -> resultsComparer.compareToBl(response, e2EResultLongBL, 
          astarHistogramsArrayBl, latencyHistogramBl)) 
       .reduce(null, 
         (sumReport, compare2) -> 
         { 
          if (sumReport == null) { 
           sumReport = new Report(); 
          } 
          sumReport.add(compare2); 
          return sumReport; 
         }, 
         (report1, report2) -> 
         { 
          Report report3 = new Report(); 
          report3.add(report1); 
          report3.add(report2); 
          return report3; 
         }); 

這臺機器的負載太多,並且很快就會返回HTTP 404錯誤。

有兩件事情我沒有找到在谷歌的答案:

  1. 什麼是parallelStream默認線程#,如果不customed 集?
  2. 如何將工作線程的數量設置爲4?

回答

2

Stream API使用ForkJoinPool執行併發任務。引用它的文檔:

共用池是通過使用默認參數的構造默認值,但這些可以設置三個系統屬性來控制:

  • java.util.concurrent.ForkJoinPool.common.parallelism - 並行級,一個非負整數
    ...

而且

a ForkJoinPool可以用給定的目標並行性級別來構造;默認情況下,等於可用處理器的數量。

所以定製的線程數,你可以在系統屬性java.util.concurrent.ForkJoinPool.common.parallelism的值設置爲你想要的:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4") 

工作線程的數量設置爲4.默認情況下,數量線程將等於您擁有的處理器數量。

+1

更改屬性將有助於更改默認值。如果您需要使用專用ForkJoinPool的不同流量的不同值,將會是更好的解決方案。 – SubOptimal

1

ParallelStream使用ForkJoinPool.commonPool()已初始化爲您的核心數 - 1

它可以將您自己的ForkJoin Executor傳遞給parallelStream,如here所述,但是執行者必須是ForkJoin,這對於IO綁定任務來說不是最好的。

+0

謝謝。爲什麼「這不是IO界限任務的最佳選擇」? –

+0

@EladBenda任何fork連接執行器 –

1

你可以啓動你的管道自定義ForkJoinPool內,它將被用於獲取結果:

ForkJoinPool fjp = new ForkJoinPool(2); 
System.out.println("My pool: " + fjp); 
String result = CompletableFuture.supplyAsync(
    () -> Stream.of("a", "b", "c").parallel() 
    .peek(x -> System.out.println(
     ((ForkJoinWorkerThread) Thread.currentThread()).getPool())) 
    .collect(Collectors.joining()), fjp).join(); 
System.out.println(result); 

StreamEx庫增添了語法糖方法.parallel(fjp),使這個簡單的:

String result = StreamEx.of("a", "b", "c") 
    .parallel(fjp) 
    .peek(x -> System.out.println(
     ((ForkJoinWorkerThread) Thread.currentThread()).getPool())) 
    .collect(Collectors.joining());