2016-02-19 124 views
3

我已經編寫了這個代碼,使用一個Executor.newFixedThreadPool和一個ConcurrentLinkedQueue併發地構建一個對象集合。但是,我發現這整個線程池和任務提交給它詳細。我希望我可以使用流更簡潔地編寫它。使用java8流建立一個集合

private ConcurrentLinkedQueue<ApiRunner> getRunners(final ApiRunnerBuilder builder, 
     final int testSize) throws InterruptedException { 

    final ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_CORES); 
    ConcurrentLinkedQueue<ApiRunner> runners = new ConcurrentLinkedQueue<ApiRunner>(); 
    for (int i = 0; i < testSize; i++) { 
     executor.execute(() -> { 
      runners.add(builder.build()); 
     }); 
    } 
    executor.shutdown(); 
    executor.awaitTermination(300, TimeUnit.SECONDS); 
    return runners; 
} 

我想也許它可以減少到這樣的事情(對Java和新流):

private ConcurrentLinkedQueue<ApiRunner> getRunners(final ApiRunnerBuilder builder, 
     final int testSize) throws InterruptedException { 

    ConcurrentLinkedQueue<ApiRunner> runners = new ConcurrentLinkedQueue<ApiRunner>(); 
    ConcurrentLinkedQueue<ApiRunner> runners = range(testSize).parallelStream(()-> { 
      runners.add(builder.build()); 
     }); 
    } 
    return runners; 
} 

回答

4

你可以這樣做:

ConcurrentLinkedQueue<ApiRunner> runners = 
    Stream.generate(() -> builder.build()) 
      .parallel() 
      .limit(testSize) 
      .collect(toCollection(ConcurrentLinkedQueue::new)); 
+2

對於它的價值:我非常懷疑''parallel()'只會減慢代碼速度,而不會加快速度。 –

+2

值得注意的是,在這個例子中,不需要'ConcurrentLinkedQueue',因爲流實現負責正確的同步。 – Holger

+1

@LouisWasserman這可能是真的,除非''build()''是一個昂貴的操作。 –

5

既然你說,你有很多的I/O參與,我不建議使用流API。 Stream API針對CPU綁定任務進行了優化,因爲它根據CPU內核數量調整線程數量,但對於I/O綁定任務,由於線程阻塞等待完成I/O操作不佔用CPU資源。

注意,舊的API也起着新的lambda表達式不錯:

ExecutorService executor = Executors.newFixedThreadPool(DESIRED_CONCURRENCY); 
ConcurrentLinkedQueue<ApiRunner> runners = new ConcurrentLinkedQueue<ApiRunner>(); 
executor.invokeAll(Collections.nCopies(testSize,()->runners.add(builder.build()))); 
executor.shutdown(); 
return runners; 

在實際應用中,你可能會保持比執行此任務更長的時間。方法invokeAll已經等待所有任務的完成,所以在這裏既不需要關閉也不必等待終止。 shutdown調用僅在此示例中添加以進行清理,因爲在此情況下您已在方法內部創建了執行程序。