2009-11-24 89 views
8

我是新來的蟒蛇和threading - 做一些音樂文件轉換,並希望能夠利用我的機器上的多個核心(每個核心一個活動轉換線程)的一些進展。如何限制python中活動線程的數量?

class EncodeThread(threading.Thread): 
    # this is hacked together a bit, but should give you an idea 
    def run(self): 
     decode = subprocess.Popen(["flac","--decode","--stdout",self.src], 
          stdout=subprocess.PIPE) 
     encode = subprocess.Popen(["lame","--quiet","-",self.dest], 
           stdin=decode.stdout) 
     encode.communicate() 

# some other code puts these threads with various src/dest pairs in a list 

for proc in threads: # `threads` is my list of `threading.Thread` objects 
    proc.start() 

一切正常,所有的文件得到編碼,bravo! ...然而,所有的進程立即產卵,但我只想一次運行兩個(每個核心一個)。一旦完成,我希望它移動到列表中的下一個,直到完成,然後繼續執行程序。

我該怎麼做?

(我已經看過了線程池和隊列功能,但我無法找到一個簡單的答案。)

編輯:也許我應該補充一點,我的每個線程的使用subprocess.Popen運行單獨的命令行解碼器(flac)通過管道輸入命令行編碼器(lame/mp3)。

+0

爲什麼要麻煩?讓你的線程相互競爭有什麼問題?讓每個核心徹底飽和工作會更快。 – 2009-11-24 02:34:43

+0

好吧,我想我沒有這樣想過......擁有超過2000個文件的音樂庫,我認爲同時產生(同時)2,000個解碼過程(flac)到2,000個編碼過程(跛腳)將是次優的。我錯了嗎? – thornomad 2009-11-24 02:39:53

+0

@thornomad:是的,你錯了。由於您擁有2個內核,因此僅限於2個進程是錯誤的。一個過程不會讓工作核心飽和。即使是由三部分組成的流程管線也可能具有足夠的I/O,以致內核未被完全佔用。 – 2009-11-24 03:08:25

回答

4

「我的每個線程都使用subprocess.Popen來運行單獨的命令行[process]」。

爲什麼有一堆線程管理一堆進程?這正是操作系統爲你做的。爲什麼微管理操作系統已經管理的內容?

與其監督進程的線程無關,只是分離進程。你的進程表可能無法處理2000個進程,但它可以很容易地處理幾十個(也許幾百個)進程。

你想有更多工作比你的CPU可以處理排隊。真正的問題是內存之一 - 而不是進程或線程。如果所有進程的所有活動數據的總和超過物理內存,則必須交換數據,這會降低速度。

如果你的進程有一個相當小的內存佔用,你可以有很多很多的運行。如果你的進程有很大的內存佔用,你不能有很多的運行。

+0

heh。我現在看到了我被黑了一起的方法 - 這有點多餘。那麼,是否有一種子進程管理「池」的方式(正如其他人所建議的那樣)。感謝您的輸入。隨着我的學習......只是使用'subprocess.poll()'來查看所做的事情以及仍在運行的事情?再次感謝。 – thornomad 2009-11-24 03:14:03

+0

正確。你可以使用一組簡單的過程;刪除完成的。添加一個,並將該集合的大小保持在某個限制之下。這只是一個具有'add'和'remove'的集合。 – 2009-11-24 04:01:26

1

如果您使用的是默認的「cpython」版本,那麼這不會對您有所幫助,因爲一次只能執行一個線程;看起來Global Interpreter Lock。相反,我建議在Python 2.6中查看multiprocessingmodule - 它使得並行編程變得簡單。您可以使用2*num_threads進程創建一個Pool對象,併爲其提供一些任務。它將一次執行最多2*num_threads個任務,直到完成所有任務。

在工作中,我最近遷移了一堆Python XML工具(一個不同的xpath grepper和bulk xslt轉換器)來使用這個工具,並且每個處理器有兩個進程有非常好的結果。

+1

如果你的子進程將執行函數在你的Python代碼中,多處理模塊非常棒。如果你正在調用外部程序,那麼這個模塊不會提供優於子進程模塊的優勢...因爲這些外部程序不會有任何方法將結果返回給臨時文件或管道以外的父級。多處理模塊的巨大IPC優勢在您執行的外部程序中丟失。 (例如,讓多進程調用子進程中的每個進程聽起來非常愚蠢)。 – 2009-11-25 04:46:55

0

我不是這方面的專家,但我讀了一些關於「鎖」的內容。 This article可能會幫助你

希望這有助於

1

它看起來對我說,你想要的是某種形式的游泳池,並在泳池您希望有n個線程,其中n ==處理器數量在你的系統上。然後,您將擁有另一個線程,其唯一的工作是將作業提交給隊列,工作線程可以在隊列中自由選擇並處理它們(因此對於雙代碼機器,您將有三個線程,但主線程會做很少)。

因爲你是Python的新手,雖然我會假設你不知道GIL,它是關於線程的副作用。如果您閱讀我關聯的文章,您很快就會明白,爲什麼傳統的多線程解決方案在Python世界中並不總是最好的。相反,你應該考慮使用multiprocessing模塊(Python 2.6中的新功能,在2.5中可以使用use this backport)來實現相同的效果。它通過使用多個進程來解決GIL的問題,就好像它們是同一應用程序中的線程一樣。對於如何共享數據(您正在不同的內存空間中工作)有一些限制,但實際上這並非壞事:它們只是鼓勵良好實踐,如最小化線程之間的接觸點(或本例中的進程)。

在你的情況下,你可能會使用指定here的泳池。

+0

謝謝 - 我會看看多進程......我編輯了我的問題以獲得更多詳細信息......看起來,subprocess.Popen確實有點中斷並做自己的事情。 – thornomad 2009-11-24 02:50:57

+0

多處理模塊BTW對於2.6(來自支持2.4和2.5的pyprocessing第三方模塊)來說是一個很好的補充。 但是,它不適合運行外部程序。多處理模塊的主要優點在於它在線程支持後建模。您可以創建Queue()作爲主要的內部(線程/進程)通信機制,以消除您自己顯式鎖定的大部分需求。 (Queue()爲任意對象的多個生產者和消費者提供一致的支持)。如果孩子們運行Python代碼很好。 – 2009-11-25 04:41:02

1

簡答:不要使用線程。

對於一個工作的例子,你可以看看我最近在工作中扔在一起的東西。這是一個圍繞ssh的小包裝,它運行一個可配置數量的Popen()子過程。我已將它發佈在:Bitbucket: classh (Cluster Admin's ssh Wrapper)

如上所述,我不使用線程;我剛從孩子身上產生,循環他們呼叫他們的.poll()方法並檢查超時(也可配置)並在收集結果時補充池。我已經玩過不同的sleep()值,並且在過去我已經編寫了一個版本(在子進程模塊被添加到Python之前),其中使用了信號模塊(SIGCHLD和SIGALRM)和os.fork()os.execve()函數---我的管道和文件描述符管道等)。

在我的情況下,我逐漸打印結果,因爲我收集它們...並記住所有這些結果總結(當所有作業已完成或因超時超時而死亡時)。

我在25,000個內部主機(其中許多已關閉,退休,位於國際上,無法訪問我的測試帳戶等)上發佈此信息。它在兩個多小時內完成了工作,沒有任何問題。 (其中大約60個是由於系統處於退化/顛簸狀態而超時 - 證明我的超時處理工作正常)。

所以我知道這個模型可靠地工作。使用此代碼運行100個當前的ssh進程似乎不會引起任何明顯的影響。 (這是一個適中的FreeBSD盒子)。我曾經運行舊的(前子進程)版本,我的舊512MB筆記本電腦上有100個併發進程,沒有問題。 (順便說一下:我打算清理它併爲其添加功能;隨意貢獻或克隆自己的分支;這就是Bitbucket.org的用處)。

+0

謝謝 - 我今天會更仔細地看一下。我很快就想到了一個非常簡單的while循環,它似乎只是檢查'p.communicate()'方法。 (PS:我認爲你在源代碼的第4行丟失了一個關閉''''')。 – thornomad 2009-11-24 12:34:46

30

如果要限制並行的線程數,使用semaphore

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads) 

class EncodeThread(threading.Thread): 

    def run(self): 
     threadLimiter.acquire() 
     try: 
      <your code here> 
     finally: 
      threadLimiter.release() 

啓動所有線程一次。除maximumNumberOfThreads之外的所有內容都將在threadLimiter.acquire()中等待,等待的線程只會在另一個線程通過threadLimiter.release()後纔會繼續。

+1

這完全回答了最初的問題。非常適合那些最終在Google上搜索的用戶。 – 2013-06-22 03:10:26