4

在下面的代碼中,我試圖創建一個沙盒主工作系統,其中工作人員中的全局變量更改不反映給其他工作人員。打印功能使多處理程序失敗

爲了實現這一點,每創建一個任務時都會創建一個新進程,並且爲了使執行並行,進程本身的創建由ThreadPoolExecutor進行管理。

import time 
from concurrent.futures import ThreadPoolExecutor 
from multiprocessing import Pipe, Process 


def task(conn, arg): 
    conn.send(arg * 2) 


def isolate_fn(fn, arg): 

    def wrapped(): 
    parent_conn, child_conn = Pipe() 
    p = Process(target=fn, args=(child_conn, arg), daemon=True) 
    try: 
     p.start() 
     r = parent_conn.recv() 
    finally: 
     p.join() 
    return r 

    return wrapped 


def main(): 
    with ThreadPoolExecutor(max_workers=4) as executor: 
    pair = [] 

    for i in range(0, 10): 
     pair.append((i, executor.submit(isolate_fn(task, i)))) 

     # This function makes the program broken. 
     # 
     print('foo') 

    time.sleep(2) 

    for arg, future in pair: 
     if future.done(): 
     print('arg: {}, res: {}'.format(arg, future.result())) 
     else: 
     print('not finished: {}'.format(arg)) 

    print('finished') 

main() 

這個程序工作正常,直到我把print('foo')函數放在循環中。如果該功能存在,某些任務仍未完成,更糟的是,該程序本身並未完成。

結果並不總是相同的,但下面是典型的輸出:

foo 
foo 
foo 
foo 
foo 
foo 
foo 
foo 
foo 
foo 
arg: 0, res: 0 
arg: 1, res: 2 
arg: 2, res: 4 
not finished: 3 
not finished: 4 
not finished: 5 
not finished: 6 
not finished: 7 
not finished: 8 
not finished: 9 

爲什麼這個節目如此脆弱?

我使用Python 3.4.5。

回答

1

使用

from multiprocessing import set_start_method 

... rest of your code here .... 

if __name__ == '__main__': 
    set_start_method('spawn') 
    main() 

如果您搜索爲#1的python多處理和多線程,你會發現提吊類似問題公平幾個問題嘗試。 (特別是Python版本2.7和3.2)

混合多線程和多處理仍然有點問題,甚至python文檔multiprocessing.set_start_method提到。在你的情況下,'spawn''forkserver'應該沒有任何問題。

另一種選擇可能是直接使用MultiProcessingPool,但在更復雜的用例中這可能不適用於您。

Btw。 '未完成'可能仍會出現在您的輸出中,因爲您並未等待子進程完成,但整個代碼不應再掛起,並始終完成乾淨。

0

您不是每次都創建ThreadPoolExecutor,而是在每次迭代中使用預初始化池。我真的無法跟蹤哪個打印語句妨礙你?

+0

謝謝。我已經闡明瞭打印功能。 –