2017-08-24 128 views
2

即時構建一個多線程應用程序,使用WorkerThreads處理來自BlockingQueues的任務。 worker看起來像follws(作爲抽象類,子類實現processItem())。WorkerThread:等待處理完成(BlockingQueue)

abstract class WorkerThread extends Thread { 
    BlockingQueue<Task> q; 
    int tasksInSystem; // globally available 

    public void run() { 
     while(!interrupted()) { 
      Task t = q.take(); 
      process(t); 
      tasksInSystem--; 
     } 
    } 

    abstract void process(Task t); 
} 

特別的是,我想等待所有任務完成。 我的第一個想法是:

  • 數每增加一個任務
  • 降低用計數器處理完成後。

但是: 但也有不同的任務,不同的工人實現和多隊列。所以我將不得不維護大量不同的櫃檯。

我想什麼有:

q.waitForEmptyAndCompleted()

這需要排隊跟蹤「飛行」的任務,並要求工作進程,當他們完成信號(而不是tasksInsystem---;)。

工作人員無法增加該計數器,因爲他必須在將他們從隊列中取出之後對其進行計數。但是另一個線程可能會在take()調用之後立即運行,以至於工作人員無法事先增加計數器。

因此,計數器增加和take()必須連在一起(atomar)。這導致我到一個專門的BlockingQueue。

我沒有找到預先制定的解決方案。所以我最好的猜測是實現我自己的BlockingQueue。是否有我可以使用的東西(爲了避免自己實現並測試線程安全的阻塞隊列)?或者你有什麼想法以不同的方式實施這種等待電話?

+2

你可以使用'ExecutorService'並調用shutdown嗎? Executor服務將管理所有任務,您需要將隊列與它關聯。關機將阻止,直到所有任務完成。 – hgrey

+0

嘿,謝謝你的回答。我接近實際使用ExecutorService。但是當我調用shutdown()時,整個服務停止。我實際上想重用隊列。 waitForEmptyAndComplete()後發生的事情將觸發新的任務。所以我必須爲每一輪設置一個新的ExecutorService。至少,它不會強迫我實現我自己的Queue-Variant。 – markus

回答

1

好的,因爲一般ExecutorService是不夠的,或許ForkJoinPool將工作,它不顯式公開隊列,但應該是非常容易使用給你所描述的。

關鍵方法是awaitQuiescence(long timeout, TimeUnit unit)它將等待,直到所有提交的任務已完成執行。