0

我可以直接與你們覈對一下,我的概念在使用Python的多處理並行執行exe文件時有什麼根本性的錯誤。使用多處理進程()進行並行執行的正確方法

所以我有一大堆作業(在示例代碼中爲100000),我想使用所有可用的核心(我的計算機中有16個)來並行運行它們。下面的代碼沒有像我看到的許多例子那樣使用隊列,但它似乎工作。只是爲了避免代碼「起作用」的情況,但是當我擴展到運行多個計算節點時,有一個巨大的錯誤在等待炸燬。誰能幫忙?

import subprocess 
import multiprocessing 

def task_fn(task_dir) : 
    cmd_str = ["my_exe","-my_exe_arguments"] 
    try : 
     msg = subprocess.check_output(cmd_str,cwd=task_dir,stderr=subprocess.STDOUT,universal_newlines=True) 
    except subprocess.CalledProcessError as e : 
     with open("a_unique_err_log_file.log","w") as f : 
      f.write(e.output) 
    return; 

if __name__ == "__main__": 

    n_cpu = multiprocessing.cpu_count() 
    num_jobs = 100000 
    proc_list = [multiprocessing.Process() for p in range(n_cpu)] 

    for i in range(num_jobs): 
     task_dir = str(i) 
     task_processed = False 
     while not(task_processed) : 
      # Search through all processes in p_list repeatedly until a 
      # terminated processs is found to take on a new task 
      for p in range(len(p_list)) : 
       if not(p_list[p].is_alive()) : 
        p_list[p] = multiprocessing.Process(target=task_fn,args=(task_dir,)) 
        p_list[p].start() 
        task_processed = True 

    # At the end of the outermost for loop 
    # Wait until all the processes have finished 
    for p in p_list : 
     p.join() 

    print("All Done!") 

回答

1

不是親自產卵和管理過程,而是使用Pool of workers。它旨在爲您處理所有這些問題。

由於您的工作人員正在生成一個子進程,因此您可以使用線程而不是進程。

此外,似乎工人將寫在同一個文件。您將需要保護其對併發實例的訪問,否則結果將完全失序。

from threading import Lock 
from concurrent.futures import ThreadPoolExecutor 


mutex = Lock() 
task_dir = "/tmp/tasks" 


def task_fn(task_nr): 
    """This function will run in a separate thread.""" 
    cmd_str = ["my_exe","-my_exe_arguments"] 
    try: 
     msg = subprocess.check_output(cmd_str, cwd=task_dir, stderr=subprocess.STDOUT, universal_newlines=True) 
    except subprocess.CalledProcessError as e: 
     with mutex: 
      with open("a_unique_PROTECTED_err_log_file.log", "w") as f : 
       f.write(e.output) 

    return task_nr 


with ThreadPoolExecutor() as pool: 
    iterator = pool.map(task_fn, range(100000)) 
    for result in iterator: 
     print("Task %d done" % result) 
+0

嘿,謝謝你的回覆!將嘗試它。但基本上,我編碼的方式沒有任何問題嗎? – bFig8

+0

是的。事實上,如果發生錯誤,由於未受保護的文件訪問,您的日誌文件將全部被打亂。而且,那裏的流程的使用是過度的。只需使用線程。 – noxdafox

+0

好吧,也許有一個誤解,「a_unique_err_log_file.log」是我懶惰的說,每個日誌文件將是唯一的。這並不意味着從字面上理解,因爲在文件名中真的是「a_unique_err_log_file.log」,所以每個進程都將寫入一個獨特的文件。 – bFig8