2011-02-06 51 views
61

我剛剛在this blog post找到CompletionService。但是,這並沒有真正展示CompletionService相對於標準ExecutorService的優勢。相同的代碼可以用任何一種書寫。那麼,CompletionService何時有用?什麼時候應該使用ExecutorService上的CompletionService?

你可以給一個簡短的代碼樣本,使其透明?例如,該代碼示例只是示出了其中不需要CompletionService(=相當於ExecutorService的)

ExecutorService taskExecutor = Executors.newCachedThreadPool(); 
    //  CompletionService<Long> taskCompletionService = 
    //    new ExecutorCompletionService<Long>(taskExecutor); 
    Callable<Long> callable = new Callable<Long>() { 
     @Override 
     public Long call() throws Exception { 
      return 1L; 
     } 
    }; 

    Future<Long> future = // taskCompletionService.submit(callable); 
     taskExecutor.submit(callable); 

    while (!future.isDone()) { 
     // Do some work... 
     System.out.println("Working on something..."); 
    } 
    try { 
     System.out.println(future.get()); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } catch (ExecutionException e) { 
     e.printStackTrace(); 
    } 

回答

78

隨着ExecutorService的,一旦你提交的任務運行,您需要手動編寫代碼以高效地獲取完成的任務結果。通過CompletionService,這是非常自動的。您提交的代碼中的差異不是很明顯,因爲您只提交一項任務。但是,想象一下你有一份要提交的任務列表。在下面的例子中,多個任務被提交給CompletionService。然後,它不要試圖找出哪個任務已經完成(以獲得結果),而是要求CompletionService實例在可用時返回結果。

public class CompletionServiceTest { 

     class CalcResult { 
      long result ; 

      CalcResult(long l){ 
       result = l; 
      } 
     } 

     class CallableTask implements Callable<CalcResult> { 
      String taskName ; 
      long input1 ; 
      int input2 ; 

      CallableTask(String name , long v1 , int v2){ 
       taskName = name; 
       input1 = v1; 
       input2 = v2 ; 
      } 

      public CalcResult call() throws Exception { 
       System.out.println(" Task " + taskName + " Started -----"); 
       for(int i=0;i<input2 ;i++){ 
        try { 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         System.out.println(" Task " + taskName + " Interrupted !! "); 
         e.printStackTrace(); 
        } 
        input1 += i; 
       } 
       System.out.println(" Task " + taskName + " Completed @@@@@@"); 
       return new CalcResult(input1) ; 
      } 

     } 

     public void test(){ 
      ExecutorService taskExecutor = Executors.newFixedThreadPool(3); 
       CompletionService<CalcResult> taskCompletionService = 
        new ExecutorCompletionService<CalcResult>(taskExecutor); 

      int submittedTasks = 5; 
      for(int i=0;i< submittedTasks;i++){ 
       taskCompletionService.submit(new CallableTask(
         String.valueOf(i), 
         (i * 10), 
         ((i * 10) + 10 ) 
         )); 
       System.out.println("Task " + String.valueOf(i) + "subitted"); 
      } 
      for(int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++){ 
       try { 
        System.out.println("trying to take from Completion service"); 
        Future<CalcResult> result = taskCompletionService.take(); 
        System.out.println("result for a task availble in queue.Trying to get()" ); 
        // above call blocks till atleast one task is completed and results availble for it 
        // but we dont have to worry which one 

        // process the result here by doing result.get() 
        CalcResult l = result.get(); 
        System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result)); 

       } catch (InterruptedException e) { 
        // Something went wrong with a task submitted 
        System.out.println("Error Interrupted exception"); 
        e.printStackTrace(); 
       } catch (ExecutionException e) { 
        // Something went wrong with the result 
        e.printStackTrace(); 
        System.out.println("Error get() threw exception"); 
       } 
      } 
     } 
    } 
+5

另一個例子參見Java Concurrency in Practice pg。有一個CompletionService用於在圖像可用時渲染圖像。 – Pete 2012-04-29 04:38:17

10

我想的Javadoc最好答案當CompletionService的方式的ExecutorService不是有用的問題。

將新異步任務的生成與已完成任務的結果消耗分離的服務。

基本上,該接口允許程序具有創建生產者和提交任務(甚至是檢查這些意見的結果),而無需瞭解任何其他消費者的這些任務的結果。同時,知道CompletionService的消費者可能會在polltake的結果中知道生產者提交任務。

爲了記錄,我可能是錯的,因爲它太晚了,但我相當確定該博客文章中的示例代碼導致內存泄漏。如果沒有積極的消費者從ExecutorCompletionService的內部隊列中獲得結果,我不確定博主是如何期待排隊的。

7

如果您想要並行執行多個任務,然後以完成順序處理它們,基本上可以使用CompletionService。所以,如果我執行5個職位,那麼CompletionService會給我第一個完成。除了提交Callable之外,只有一項任務的示例不會賦予Executor以外的額外價值。

120

省略許多細節:

  • 的ExecutorService =輸入隊列+工作者線程
  • CompletionService =輸入隊列+工作者線程+輸出隊列
4

首先,如果我們不想浪費處理器時間,我們將不會使用

while (!future.isDone()) { 
     // Do some work... 
} 

我們必須使用

service.shutdown(); 
service.awaitTermination(14, TimeUnit.DAYS); 

這段代碼壞的事情是,它將關閉ExecutorService。如果我們想繼續使用它(即我們有一些遞歸任務創建),我們有兩種選擇:invokeAll或ExecutorService

invokeAll將等待直到所有任務完成。 ExecutorService授予我們一個接一個地調查結果的能力。

而且,finily,遞歸的例子:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); 
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); 

while (Tasks.size() > 0) { 
    for (final Task task : Tasks) { 
     completionService.submit(new Callable<String>() { 
      @Override 
      public String call() throws Exception { 
       return DoTask(task); 
      } 
     }); 
    } 

    try {     
     int taskNum = Tasks.size(); 
     Tasks.clear(); 
     for (int i = 0; i < taskNum; ++i) { 
      Result result = completionService.take().get(); 
      if (result != null) 
       Tasks.add(result.toTask()); 
     }   
    } catch (InterruptedException e) { 
    // error :(
    } catch (ExecutionException e) { 
    // error :(
    } 
} 
1

比方說,你有5個長期運行的任務(可調用任務)和您所提交的那些任務執行者服務這裏的例子。現在想象一下,如果任何一項任務完成,您不想等待所有5個任務競爭,而是想對這些任務進行某種處理。現在可以通過編寫未來對象的輪詢邏輯來完成這個任務,或者使用這個API。

相關問題