2013-03-11 45 views
1

我必須在200個循環中的文件上運行一個程序。一次運行x個命令的次數

現在我讓他們跑這樣的:

for combo in it.combinations(files, 2): 
    cmd = ["command", combo[0], combo[1]] 
    subprocess.Popen(cmd) 

我想一次只運行60說不會壓倒計算機,命令是非常處理器密集型。一旦有60個進程運行,暫停循環的最好方法是什麼,然後在完成後重新開始,以便始終運行60個進程?

+0

檢查threading.Thread,你可以創建60個線程,然後讓他們從列表中繪製他們的命令,直到它是空的(然後讓他們退出) – 2013-03-11 15:11:43

+0

@ maxk。使用python的線程不會提高一般情況下的性能。 OP已經包含了使用子進程模塊的代碼,推薦在python中並行執行。 – Wilduck 2013-03-11 15:15:20

+0

它不會提高性能,但它更具可定製性,並且編寫自己的線程子類的能力爲您提供了更多類似問題的選項。只是一個想法 – 2013-03-11 15:35:17

回答

4
#!/usr/bin/env python 
import itertools 
import subprocess 
from multiprocessing.dummy import Pool # use threads 

def run(combo): 
    cmd = ["command", combo[0], combo[1]] 
    return combo, subprocess.call(cmd) 

def main(): 
    p = Pool(60) # 60 subprocesses at a time 
    for combo, rc in p.imap_unordered(run, itertools.combinations(files, 2)): 
     print("%s exited with %s" % (combo, rc)) 
    p.close() 
    p.join() 

if __name__ == "__main__": 
    main() 

This answer demonstrates various techniques to limit number of concurrent subprocesses:它顯示了multiprocessing.Pool,concurrent.futures,線程+基於隊列的解決方案。

+0

+1好吧,讓我們開始滾動:這是一個合理的答案。我會贊成這一點。 – hughdbrown 2013-03-11 16:04:53

+0

我認爲這會奏效,謝謝! – helicase 2013-03-11 17:24:02

0

你想是這樣的:

import socket 
import threading 
import Queue 
import subprocess 

class IPThread(threading.Thread): 
    def __init__(self, queue, num): 
     super(IPThread, self).__init__() 
     self.queue = queue 
     self.num = num 
    def run(self): 
     while True: 
      try: 
       args = self.queue.get_nowait() 
       cmd = ["echo"] + [str(i) for i in args] 
       p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
       out, err = p.communicate() 
       print out 
      except Queue.Empty: 
       # Nothing left in the Queue -- we are done 
       print "Queue %d done" % self.num 
       break 
      except Exception as err: 
       # Handle exception 
       print err 
      self.queue.task_done() 

def create_threads(q, size): 
    for i in range(size): 
     thread = IPThread(q, i) 
     thread.setDaemon(True) 
     thread.start() 
    q.join() 

def fill_queue(q): 
    # Call q.put(args) in a loop to populate Queue with arguments 
    from itertools import permutations 
    x = list(range(20)) 
    for arg1, arg2 in permutations(x, 2): 
     q.put([arg1, arg2]) 
    print q.qsize() 

def main(): 
    q = Queue.Queue() 
    fill_queue(q) 
    create_threads(q, 60) 
    print "Done" 

if __name__ == '__main__': 
    main() 

創建一個事物的隊列去努力。專門化你的線程派生類。旋轉你的線程。等待他們完成。

您可以看到這些任務正在同時運行,因爲它們的輸出會相互干擾。這是一個功能!

+0

本示例不會等待該過程完成,也不會處理該過程由於其管道已滿而生成輸出並停止的可能性。正如所寫,它將一次啓動所有的流程。這不是海報想要的。 – tdelaney 2013-03-11 15:37:19

+0

一次最多運行60個進程,每個線程創建一個。它不會一次啓動所有的流程。你填寫輸出緩衝區是正確的。這大部分都是採用海報。例如,我不知道應該如何排隊,所以我只是使用整數。我不知道OP的輸出可能填滿多大的緩衝區,所以我只能猜測'bufsize'要使用。但這是大部分問題,對 - 同時運行多個進程? – hughdbrown 2013-03-11 16:01:16

+0

它被編輯包含p.communicate(),所以現在它工作正常。 – tdelaney 2013-03-11 16:03:44

0

你可以做非常簡單的事情,如:

from time import sleep 

count = 0 
for combo in it.combinations(files, 2): 
    while count < 60: 
     cmd = ["command", combo[0], combo[1]] 
     subprocess.Popen(cmd) 
     count = count + 1 
     if subprocess_is_done: 
      count = count - 1 
    sleep(5) 

顯然,你需要弄清楚如何從您的命令得到subprocess_is_done

這個據我可以告訴適用於簡單的情形,但不知道你想運行哪些...

+0

我正在考慮這些方面,但我不確定如何判斷子進程何時完成。也許等待()或輪詢(),不知道他們如何在這個循環中工作。 – helicase 2013-03-11 15:25:33

+0

你需要記錄所有當前正在運行的進程,並使用'。poll()'來確定它們的狀態。 – 2013-03-11 15:27:00

1

這可能幫助:

import itertools as it 
import time 
import subprocess 

files = range(5) 
max_load = 3 
sleep_interval = 0.5 

pid_list = [] 
for combo in it.combinations(files, 2): 
    # Random command that takes time 
    cmd = ['sleep', str(combo[0]+combo[1])] 

    # Launch and record this command 
    print "Launching: ", cmd 
    pid = subprocess.Popen(cmd) 
    pid_list.append(pid) 

    # Deal with condtion of exceeding maximum load 
    while len(filter(lambda x: x.poll() is None, pid_list)) >= max_load: 
    time.sleep(sleep_interval)