2012-07-19 74 views
2

所以我寫了一個工具,它將項目列表分割成給定數量的列表(比如說10),然後將這10個列表和產生的10個線程分開,「 EvaluationThreads「(擴展threading.thread),並且每個線程評估提供的任何評估值。當我開始每個線程我把他們都到一個列表和它們產卵起飛後我有以下代碼:正確的方法來處理Python中的線程連接

for th in threadList: 
    th.join() 
    someTotal = th.resultsAttribute 

這就是我如何處理等待所有線程完成,並收集他們的信息。雖然這是等待所有事情完成並收集結果的一種工作方式,但我覺得必須有一種更優雅的方式來完成它,因爲這些線程可能在不同的時間完成,並且如果第一個線程開始完成,早些完成的人必須等待該線程完成才能加入。有沒有辦法獲得這些線程的信息,並在完成時加入它們,而不是按照它們開始的順序加入它們?我原本認爲我會在線程中使用某種回調,但我不確定是否有更可接受的解決方案。

感謝您的幫助。

編輯:爲了澄清,我的評估函數沒有CPU綁定,我不是試圖在線程之間分配文件以儘快完成它,每個線程都有固定的偶數數量的作業。

+0

爲什麼你的問題是一個問題?一個已經完成但尚未加入的線程正在浪費很少的資源(基本上,內核或用戶空間中操作系統維護的表中的一個小表項)。 – abarnert 2012-07-20 00:25:06

+0

我想這不完全是一個問題,但它似乎是一個非常不雅的解決方案,如果空閒線程不是一個只是等待加入的問題,我想我不會擔心它。 – hkothari 2012-07-20 00:39:37

+2

備註:如果您的「評估」操作受CPU限制,那麼在此應用程序中使用線程可能沒有獲得太多好處。閱讀CPython的全球解釋器鎖定(GIL)。 – 2012-07-20 01:00:09

回答

1

使用隊列儘快從你的線程將信息推了,因爲它是可用的:

比方說,這是你的線程:

class myThread(threading.Thread): 
    def __init__(self, results_queue): 
     self.results_queue = results_queue 
     #other init code here 


    def run(self): 
     #thread code here 

     self.results_queue.put(result) #result is the information you want from the thread 

這是你的主要代碼:

import Queue #or "import queue" in Python 3.x 
results_queue = Queue() 

#thread init code here 

for i in xrange(num_threads_running): 
    data = results_queue.get() # queue.get() blocks until some item is available 
    #process data as it is made available 

#at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue. 
+0

我相信只有當你明確地加入一個線程或者Thread對象被銷燬時,底層的系統資源纔會被釋放,這意味着需要調用join(),除非你關於退出,或者你有一個用塊或等價物來管理線程對象。 – abarnert 2012-07-20 01:38:45

+1

@abarnert:你有鏈接嗎?我無法在文檔中找到任何關於此的信息。所有它在Thread.join()下面說的是它「阻塞調用線程,直到它的連接方法被調用的線程終止。」 – 2012-07-20 01:55:26

+1

@abarnert:從文檔中,「其他線程可以調用一個線程的join()方法。這會阻塞調用線程,直到其調用join()方法的線程終止。」這意味着線程終止獨立於被調用的join()方法而發生。 – 2012-07-20 02:23:47

2

對於你的主要問題:

如果你正在做更復雜的事情t這個 - 或者,特別是如果你反覆這樣做 - 你可能需要一個「線程組」類。其中有幾十個是預製的,但如果你不喜歡其中任何一個,自己寫一個就很麻煩。

然後,而不是這樣的:

threadList = [] 
for argchunk in splitIntoChunks(values, 10): 
    threadList.append(threading.Thread(target=myThreadFunc, args=argchunk)) 
... 
someTotal = 0 
for th in threadList: 
    th.join() 
    someTotal += th.resultsAttribute 

你可以這樣做:

threadGroup = ThreadGroup.ThreadGroup() 
for argchunk in splitIntoChunks(values, 10): 
    threadGroup.newThread(myThreadFunc, argchunk) 
threadGroup.join() 
someTotal = sum(th.resultsAttribute for th in threadGroup) 

或者,甚至更好,一個完整的線程池庫,這樣你就可以做到這一點:

pool = ThreadPool(10) 
for argchunk in splitIntoChunks(values, 100): 
    pool.putRequest(myThreadFunc, argchunk) 
pool.wait() 

這裏的優勢是,您可以輕鬆地在10個線程上安排100個作業,而不是10個作業o每個線程不需要維護一個隊列等等。缺點是你不能只迭代線程來獲得返回值,你必須迭代工作 - 理想情況下,你不想保留工作活着直到最後,以便您可以迭代它們。

這給我們帶來了第二個問題,即如何從線程(或作業)中獲取值。有很多種方法可以做到這一點。

你做了什麼工作。你甚至不需要任何鎖定。

使用回調,如你所說,也適用。但請記住,回調將在工作線程上運行,而不是主線程,因此如果它訪問某個全局​​對象,則需要某種同步。

如果你想反正同步,可能沒有任何好處的回調。例如,如果你所要做的只是求和一堆值,你可以設置total=[0],並讓每個線程在鎖內執行total[0] += myValue。 (當然,在這種情況下,它可能更有意義,只是做在主線程中求和,避免鎖,但如果同化的結果的工作很多更爲強大,這樣的選擇可能不是那麼簡單。)

您也可以使用某種原子對象,而不是顯式鎖定。例如,標準Queue.Queue和collections.deque都是不可分割的,因此每個線程都可以只設置q = Queue.Queue(),則每個線程推動通過連接你只是重複和總結隊列的值之後做q.push(myValue),那麼它的結果。事實上,如果每個線程都只需要推送一次隊列,就可以在隊列本身上執行10次阻塞獲取,之後您就知道group.join()pool.wait()或其他任何會快速返回的隊列。

或者你甚至可以把回調的工作到一個隊列。再次,你可以做10個阻塞獲取隊列,每次執行結果。

如果每個線程都可以返回多個對象,他們可以把警戒值或回調到時,他們正在做的,你的主線程不斷出現,直到它顯示的是10個哨兵隊列。

相關問題