2011-02-16 36 views
2

我有一個簡單的web服務在Tomcat容器內運行,這本質上是多線程的。在進入服務的每個請求中,我想對外部服務進行併發呼叫。 java.util.concurrent中的ExecutorCompletionService部分讓我感受到了。我可以爲它提供一個線程池,它將負責執行我的併發調用,並在任何結果準備就緒時通知我。我該如何實現或找到線程安全的CompletionService的等價物?

來處理特定的傳入請求可能看起來像代碼:

void handleRequest(Integer[] input) { 
    // Submit tasks 
    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool()); 
    for (final Integer i : input) { 
     completionService.submit(new Callable<Integer>() { 
      public Integer call() { 
       return -1 * i; 
      } 
     }); 
    } 

    // Do other stuff... 

    // Get task results 
    try { 
     for (int i = 0; i < input.size; i++) { 
      Future<Integer> future = completionService.take(); 
      Integer result = future.get(); 
      // Do something with the result... 
     } 
    } catch (Exception e) { 
     // Handle exception 
    } 
} 

這應該做工精細和花花公子,但效率非常低,因爲一個新的線程池被分配給每個傳入的請求。如果我將CompletionService作爲共享實例移出,我將遇到多個請求共享相同CompletionService和線程池的線程安全問題。當請求提交任務並獲得結果時,結果不會是他們提交的結果。

因此,我需要的是一個線程安全的CompletionService,它允許我在所有傳入請求中共享一個公共線程池。當每個線程完成一項任務時,應該通知傳入請求的適當線程,以便它可以收集結果。

什麼是最直接的方式來實現這種功能?我相信這種模式已經被應用了很多次,我只是不確定這是Java併發庫提供的東西,還是可以使用某些Java併發構建塊輕鬆構建。

UPDATE:一個警告我忘了提及的是,我希望被任何的我提交的任務完成會立即通知。這是使用CompletionService的主要優勢,因爲它可以分離任務和結果的生產和消耗。我實際上並不關心我得到結果的順序,而且我希望避免不必要的阻塞,等待結果按順序返回。

+0

我是否明白,有一個輸入請求需要啓動input.length()線程,並且需要在所有線程完成後執行一些操作? – 2011-02-16 08:24:11

+0

這是部分正確的。我想要啓動input.length()線程,並且想要在線程完成後立即處理結果,而不管順序如何。我不關心我得到結果的順序,只是在其中任何一個結束時我都會收到通知。 – pmc255 2011-02-16 18:57:29

回答

2

您分享Executor但不是CompletionService

我們有一個AsyncCompleter所做的正是這一點,並處理所有的簿記,讓您:在回報和塊,直到結果的順序

Iterable<Callable<A>> jobs = jobs(); 
Iterable<A> results async.invokeAll(jobs); 

results迭代可

+0

我認爲這是我正在尋找的。我其實不關心結果的完成順序,並且希望在任何任務完成時得到通知。你能詳細說明你的AsyncCompleter如何跟蹤提交的作業,並在任何作業完成時通知被阻塞的線程? – pmc255 2011-02-16 18:13:28

0

爲什麼你需要一個CompletionService

每個線程都可以簡單地在ExecutorService的「常規」和共享實例上提交或調用Callables。然後每個線程保留自己的私人Future引用。

另外,Executor及其後代在設計上是線程安全的。你真正想要的是每個線程都可以創建自己的任務並檢查其結果。

Javadoc在java.util.concurrent是優秀的;它包括使用模式和示例。請閱讀ExecutorService和其他類型的文檔以更好地瞭解如何使用它們。

+0

查看我的更新和上面的評論。我忘記說的主要是我不關心結果返回的順序。因此,堅持Future對象並遍歷它們可能會導致不必要的阻塞;我想處理第一個結果,一旦完成,無論順序如何。 – pmc255 2011-02-16 18:06:11

1

您可以使用普通的共享ExecutorService。無論何時提交任務,您都會爲您剛剛提交的任務獲得Future。您可以將它們全部存儲在列表中並稍後查詢它們。

例子:

private final ExecutorService service = ...//a single, shared instance 

void handleRequest(Integer[] input) { 
    // Submit tasks 
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length); 
    for (final Integer i : input) { 
     Future<Integer> future = service.submit(new Callable<Integer>() { 
      public Integer call() { 
       return -1 * i; 
      } 
     }); 
     futures.add(future); 
    } 

    // Do other stuff... 

    // Get task results 
    for(Future<Integer> f : futures){ 
     try { 
      Integer result = f.get(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

} 
+0

我剛剛更新了我的問題。您的解決方案正常工作然而,我實際上並不在乎訂購,所以使用標準的ExecutorService並重復遍歷期貨可能會導致不必要的阻塞。例如,如果我最早提交的最後一項任務是完成,我希望能夠立即處理。 ExecutorCompletionService很好地提供了它的底層隊列,因此我可以簡單地「獲取」完成的第一個結果。 – pmc255 2011-02-16 18:04:23

1

java.util中。併發提供您需要的一切。如果我正確理解你的問題,你有以下要求:

你想提交請求,並立即(在合理的範圍內)處理請求結果(Response)。那麼,我相信你已經看到了你的問題的解決方案:java.util.concurrent.CompletionService。

該服務相當簡單地結合了Executor和BlockingQueue來處理Runnable和/或Callable任務。 BlockingQueue用於保存完成的任務,您可以讓另一個線程等待,直到完成的任務排隊(這是通過調用CompletionService對象的take()完成的)。

如前面的海報所述,共享執行程序,併爲每個請求創建一個CompletionService。這似乎是一件昂貴的事情,但再次考慮CS只是與Executor和BlockingQueue進行協作。既然你分享了實例化的最昂貴的對象,即Executor,我想你會發現這是一個非常合理的成本。

但是......所有這些都說了,你似乎仍然有問題,而且這個問題似乎是請求處理與處理響應的分離。這可能會通過創建一個專門處理所有請求的響應的單獨服務或某種類型的請求來實現。這裏是一個例子: (注意:這意味着Request對象實現了Callable接口,它應該返回一個Response類型...我在這個簡單例子中省略的細節)。

class RequestHandler { 

    RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) {  
    this.responseQueue = ... 
    this.executor = ... 
    } 

    public void acceptRequest(List<Request> requestList) { 

    for(Request req : requestList) { 

     Response response = executor.submit(req); 
     responseHandler.handleResponse(response); 

    } 
    } 
} 

class ResponseHandler { 
    ReentrantLock lock; 
    ResponseHandler(ExecutorService responseExecutor) { 
    ... 
    } 

    public void handleResponse(Response res) { 
    lock.lock() { 
    try { 
     responseExecutor.submit(new ResponseWorker(res)); 
    } finally { 
     lock.unlock(); 
    }  
    } 

    private static class ResponseWorker implements Runnable { 

    ResponseWorker(Response response) { 
     response = ... 
    } 

    void processResponse() {   
     // process this response 
    } 

    public void run() {  
     processResponse();  
    } 
    } 
} 

有幾件事情要記住:一,ExecutorService執行阻塞隊列中的Callables或Runnables;你的RequestHandler接收任務,並且在Executor中入隊,並被ASAP處理。同樣的事情發生在你的ResponseHandler中;收到響應後,只要該SEPARATE執行程序可以,它就會處理該響應。總之,你有兩個執行者同時工作:一個在Request對象上,另一個在Response對象上。