2011-11-30 86 views
3

我有一種情況,我期望添加到線程池比處理更快。我不認爲一個無限的隊列將是一個好主意,因爲如果不加限制,隊列中的數據可能會增加以吞噬所有的內存。鑑於此,我試圖確定一個ThreadPoolExecutor的正確設置。ThreadPoolExecutor配置

我的第一個想法是一個固定的線程池,帶有直接切換和調用者運行失敗策略。但是我不知道這是否會影響吞吐量(因爲每次調用者運行策略被調用時,線程池任務可能會完成並閒置一段時間)。

另一個想法是固定線程池與ArrayBlockingQueue,但我其實不確定的行爲。我希望這意味着Executor更喜歡在小於coreThread大小的情況下創建線程,然後隊列,如果隊列已滿,則會阻止等待隊列獲取空間。但在這裏閱讀文檔:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

它似乎更喜歡創建線程高達corePoolSize,再加入到隊列中,如果隊列已滿,它會嘗試創建線程高達maxThreads(同作爲這種情況下的coreThreads),否則它將運行失敗策略。

有人可以澄清上述情況的行爲?並且還建議這種特殊情況下的最佳設置(我的一個建議或其他可能更好的建議)?

+0

對於你的用例,我建議你測試一下。什麼最好取決於你想要維持的吞吐量以及每個任務的延遲。一個線程池可以很容易地處理每秒一百萬個任務的順序(只要任務很簡單)。如果速度太快,你有選擇減緩製片人的速度嗎? –

+0

消費者正在向web服務發佈數據,而生產者只是從文件中讀取記錄。所以希望你能明白爲什麼製片人速度快,消費者速度慢。如果我讓製作人繼續添加到隊列中,我會耗盡內存。所以我希望有一種方法可以很好地配置ThreadPoolExecutor來爲我執行限制(就像在我的第一個示例中CallerRunsPolicy是一個粗略的限制)。理想情況下,有一種方法可以使生產者塊等待隊列中的空間,當使用最大線程並且隊列已滿時。 – Kevin

+0

只要隊列長度超過某個長度,我就會暫停生產者。 (這是等待工作的長度)這樣你可以確定它永遠不會太長。 –

回答

3

我想我做了另一個答案,因爲它對同一個問題的不同解決方案。

只能使用ThreadPoolExecutor和Semaphore。該信號將與您要允許在隊列中,每個線程執行完畢後,你會因此這裏調用釋放(beforeExecute,當項目被撤下隊列是)

Semaphore semaphore = new Semaphore(1000); 
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()){ 
    protected void beforeExecute(Runnable r, Throwable t) { 
    semaphore.release(); 
    } 
} 

public void doSubmit(Runnable r){ 
    sempahore.acquire(); 
    executor.submit(r);  
} 

最大數量來創建所有線程將暫停,直到有可用的許可證(隊列中的條目)。

+0

這工作;我想我希望,因爲ArrayBlockingQueue被記錄爲在完整隊列上調用put()時被阻止,所以只需使用ArrayBlockingQueue就會產生所需的行爲(如果內置此類功能,它肯定會更好)。 – Kevin

+0

是的,這個問題之前已經提出過,我第一次看到它時,我感到很驚訝,因爲功能並沒有以某種形式提供。 –

+0

感謝您的想法。我實際上實現了java.util.concurrent.ThreadPoolExecutor的後代,其中: - 在setMaxPoolSize中拋出UnsupportedOperationException,以防止Semaphor更改 - 具有與maxpoolsize相同的最大數量的旗標成員,由構造函數初始化 - 隊列永遠是無界的linkedblockingqueue - afterExecute方法被覆蓋並釋放一個信號燈 - 所有提交獲取信號量 完美地工作。感謝這個想法。 –

0

如果一個線程正在發出所有請求,那麼無限制的隊列將會像你喜歡的那樣被阻塞,而不會增加隊列的負​​面影響。

如果多個線程發出請求,則帶有調用者運行策略的固定池應該按照您的需要工作。池中的其餘線程將由其他請求線程保持活動狀態。

+0

有一個生產者線程和N個消費者線程。如何在使用無限隊列時阻止隊列增長?你能詳細說明嗎? – Kevin

+0

我也應該在這種情況下規定使用呼叫者運行策略。由於調用者(生產者)運行並且只有他一個,所以在他忙於執行請求時隊列不能增長。 –

+0

但不會無限制的隊列意味着失敗策略永遠不會被調用?我同意主叫方運行策略的直接切換或有界隊列工作。 – Kevin

3

當所有線程當前正在使用時,ThreadPoolExecutor將創建更多的線程。這意味着隊列可以是空的,但如果所有線程正在運行先前的任務,則新任務將創建一個新線程,直到達到最大值。

如果隊列已滿並且線程全部飽和,ThreadPoolExecutor將實際拒絕任務並拋出RejectedExecutionException。所以使用BlockingQueue實際上並不會有預期的結果。

如果要限制當前正在隊列中的任務數量,可以使用ExecutorCompletionService和後備隊列。

//core 5 max 10 with 60 second idle time 
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); 
ExecutorCompletionService completionService = new ExecutorCompletionService(executor); 
private final static int MAX_IN_QUEUE = 1000; 

public void doSubmit(Runnable r){ 
    while(executor.getQueue().size() >= MAX_IN_QUEUE) 
     completionService.poll(100,TimeUnit.MILLISECONDS); 
    completionService.submit(r); 
} 

這有明顯的副作用,不得不等待元素完成。由於在進入該區塊後實際上是非真實的可能的競賽狀況,因此我循環條件。

當然,多次提交的競爭條件,但它應該足夠節流,以防止過度擁擠的隊列。這可以通過簡單地同步doSubmit方法來解決。

0

只要隊列長度變得太長,就可以讓生產者暫停。

這樣的事情將等待任務隊列的大小限制在MAX_LEN。

ExecutorService service = 
Queue workQ = // queue of service. 
BufferedReader br = 
String line; 
while((line = br.readline()) != null) { 
    service.submit(new ProcessLineRunnable(line)); 
    while(workQ.size() > MAX_LEN) Thread.sleep(1); 
}