2017-03-16 90 views
1

我想從Python爲大約8000個文件啓動外部命令。每個文件都是獨立處理的。唯一的限制是所有文件都被處理後繼續執行。我有4個物理核心,每個核心有2個邏輯核心(multiprocessing.cpu_count()返回8)。我的想法是使用四個並行獨立進程池,這些進程將在8個內核中的4個內核上運行。這樣我的機器在此期間應該可以使用。使用外部命令多處理數千個文件

這是我一直在做的事情:

import multiprocessing 
import subprocess 
import os 
from multiprocessing.pool import ThreadPool 


def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.Popen, (cmd,)) 
    pool.close() 
    pool.join() 


def main(): 
    process_files('dir1', 'dir2', 'mol:H') 
    do_some_stuff('dir2') 
    process_files('dir2', 'dir3', 'mol:a') 
    do_more_stuff('dir3') 

一個連續的治療需要120秒爲一個批次的100個文件。上面概述的多處理版本(功能process_files)僅需20秒。但是,當我在整個8000個文件集上運行process_files時,我的PC掛起並在一小時後不凍結。

我的問題是:

1)我想ThreadPool應該初始化進程池(這裏multiprocessing.cpu_count()/2過程,要準確)。然而,我的計算機掛在8000個文件上,但不是100個,這表明可能沒有考慮池的大小。要麼,要麼我做錯了什麼。你能解釋一下嗎?

2)這是在Python中啓動獨立進程時的正確方法,當它們每個都必須啓動一個外部命令時,並且這樣所有資源都不會被處理佔用?

+0

我比較@larsks('ThreadPool'和'apply_async'和subprocess'call's)和@Roland Smith(使用'Popen'對象的手動池管理)提出的解決方案。我的基準測試表明'ThreadPool'解決方案在實踐中速度更快。非常感謝你們! – user3638629

回答

1

我覺得你的基本問題是使用subprocess.Popen。該方法不會而不是在返回之前等待命令完成。由於該函數立即返回(即使該命令仍在運行),只要您的ThreadPool有關,該函數就完成了,並且它可以產生另一個...這意味着您最終會產生8000個左右的進程。

你會使用subprocess.check_call可能有更好的運氣:

Run command with arguments. Wait for command to complete. If 
the exit code was zero then return, otherwise raise 
CalledProcessError. The CalledProcessError object will have the 
return code in the returncode attribute. 

所以:

def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.check_call, (cmd,)) 
    pool.close() 
    pool.join() 

如果你真的不關心的退出代碼,那麼你可能想subprocess.call,不會在進程中出現非零退出代碼時引發異常。

+0

謝謝你對這個非常清楚和直接的事實解釋。事實上'subprocess.Popen'必須是導致這麼多進程產生的原因。我沒有使用'subprocess.call',認爲Python會等待完成這個過程,而不是用有用的工作者來填充這個池。但這就是爲什麼游泳池是在那裏的原因。 (對不起,代表太低,無法上傳。) – user3638629

+0

您仍然可以通過單擊此答案左側的複選標記將其標記爲「已接受」答案。 – larsks

+0

是的,我知道。麻煩的是,我在決定兩個非常有用的答案(我目前正在根據兩個提出的解決方案測試結果)之間做出決定時很困難。 :d – user3638629

1

如果您使用的是Python 3,我會考慮使用map方法concurrent.futures.ThreadPoolExecutor

或者,您可以自己管理子流程列表。

以下示例定義了一個函數來啓動ffmpeg將視頻文件轉換爲Theora/Vorbis格式。它爲每個啓動的子進程返回一個Popen對象。

def startencoder(iname, oname, offs=None): 
    args = ['ffmpeg'] 
    if offs is not None and offs > 0: 
     args += ['-ss', str(offs)] 
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a', 
      'libvorbis', '-q:a', '3', '-sn', oname] 
    with open(os.devnull, 'w') as bb: 
     p = subprocess.Popen(args, stdout=bb, stderr=bb) 
    return p 

在主程序中,表示運行的子進程Popen對象的列表被保持這個樣子。

outbase = tempname() 
ogvlist = [] 
procs = [] 
maxprocs = cpu_count() 
for n, ifile in enumerate(argv): 
    # Wait while the list of processes is full. 
    while len(procs) == maxprocs: 
     manageprocs(procs) 
    # Add a new process 
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1) 
    procs.append(startencoder(ifile, ogvname, offset)) 
    ogvlist.append(ogvname) 
# All jobs have been submitted, wail for them to finish. 
while len(procs) > 0: 
    manageprocs(procs) 

因此,只有當運行子進程少於核心時才啓動新進程。多次使用的代碼分爲manageprocs函數。

def manageprocs(proclist): 
    for pr in proclist: 
     if pr.poll() is not None: 
      proclist.remove(pr) 
    sleep(0.5) 

sleep的調用用於防止程序在循環中旋轉。

+0

感謝您提到'concurrent.futures.ThreadPoolExecutor'(這裏仍然使用Python 2.7)。感謝您提供手動池管理的好例子。我試圖做類似的事情(考慮到迭代它時我不應該在列表中移除),但一定是出了問題。我將很快測試這個解決方案。 (對不起,代表太低,不能upvote。) – user3638629

+0

我比較了兩種方法(你的答案和@larsks')。我非常喜歡這個解決方案,但似乎手動管理池會導致開銷,可能是因爲對「睡眠」的調用(我讓進程管理器睡了0.2秒,因爲它看起來更合適)。在批量測試中,我的真實輸入大小的1/10,手動池管理比'cpu_count() - 1'內核上的ThreadPool慢8%,比'cpu_count()/ 2'上的'ThreadPool'慢27%核心。 – user3638629

+0

你必須做一些真正的分析,看看差異來自哪裏。有很多因素會影響事物。例如,'cpu_count()'是子進程的最優*數量。您應該嘗試從'cpu_count()/ 2'到'cpu_count()* 2'的範圍內的任何內容。此外,您應該根據「molconvert」通常需要的時間調整「睡眠」的數量。但是,由於我已經完全切換到Python 3,我現在傾向於使用'concurrent.futures.ThreadPoolExecutor'這樣的東西。 –

相關問題