2016-09-10 27 views
3

給出以下Python代碼:Python的多處理,游泳池地圖 - 取消所有正在運行的進程,如果一個,返回所需的結果

import multiprocessing 

def unique(somelist): 
    return len(set(somelist)) == len(somelist) 


if __name__ == '__main__': 
    somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]] 

    pool = multiprocessing.Pool() 
    reslist = pool.map(unique, somelist) 
    pool.close() 
    pool.join() 
    print "Done!" 

    print reslist 

現在想象一下,與在這個玩具例如整數列表非常長,我想在這裏實現以下內容:如果某個列表中的某個列表返回True,則會終止所有正在運行的進程。

這導致了兩個問題(甚至更多,我還沒有想出):

  • 我怎麼能「讀」從完成的加工結果/「聽」,而其他進程正在跑步?如果例如一個進程正在處理來自somelist的[1,2,3,4,5],並且在所有其他進程之前完成,我如何才能在此刻讀取該進程的結果?

  • 鑑於在其他運行時可以「讀出」完成進程的結果的情況:如何將此結果用作終止所有其他正在運行的進程的條件?

例如,如果一個進程已經完成並且返回True,我該如何使用它作爲終止所有其他(仍然)正在運行的進程的條件?

預先感謝您的任何提示 丹

回答

4

使用pool.imap_unordered,以查看他們拿出任何命令的結果。

reslist = pool.imap_unordered(unique, somelist) 
pool.close() 
for res in reslist: 
    if res: # or set other condition here 
     pool.terminate() 
     break 
pool.join() 

您可以遍歷主進程中的imap reslist,但池進程仍在生成結果。

+0

也打破循環,否則可能會被卡住在游泳池終止後等待下一個結果。 –

+0

很好,謝謝:) –

1

沒有花哨的IPC(進程間通信)技巧,最簡單的方法是使用帶回調函數的Pool方法。該回調在主程序中運行(在由multiprocessing創建的線程中),並在每個結果可用時使用。當回調看到你喜歡的結果時,它可以終止Pool。例如,

import multiprocessing as mp 

def worker(i): 
    from time import sleep 
    sleep(i) 
    return i, (i == 5) 

def callback(t): 
    i, quit = t 
    result[i] = quit 
    if quit: 
     pool.terminate() 

if __name__ == "__main__": 
    N = 50 
    pool = mp.Pool() 
    result = [None] * N 
    for i in range(N): 
     pool.apply_async(func=worker, args=(i,), callback=callback) 
    pool.close() 
    pool.join() 
    print(result) 

這幾乎肯定會顯示以下(OS調度變幻莫測可以允許其他輸入或兩個被消耗):

[False, False, False, False, False, True, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None] 
相關問題