2012-05-02 61 views
61

在下面的示例代碼中,我想恢復函數worker的返回值。我怎麼能這樣做呢?這個值在哪裏存儲?如何恢復傳遞給multiprocessing.Process的函數的返回值?

示例代碼:

import multiprocessing 

def worker(procnum): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return procnum 


if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print jobs 

輸出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>] 

我似乎無法找到存儲在jobs對象相關的屬性。

由於提前, BLZ

回答

64

使用shared variable溝通。例如像這樣:

import multiprocessing 

def worker(procnum, return_dict): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return_dict[procnum] = procnum 


if __name__ == '__main__': 
    manager = multiprocessing.Manager() 
    return_dict = manager.dict() 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,return_dict)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print return_dict.values() 
+16

我建議使用一個['multiprocessing.Queue'](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue),而不是一個'管理器'在這裏。使用'Manager'需要產生一個全新的進程,當'Queue'執行時這是過度的。 – dano

+1

@dano:我想知道,如果我們使用Queue()對象,我們不能確定每個進程返回值的順序。我的意思是如果我們需要結果的順序,做下一步工作。我們如何確定哪個輸出來自哪個進程 – Catbuilts

+4

@Cbubuilts您可以從每個進程返回一個元組,其中一個值是您關心的實際返回值,另一個是來自進程的唯一標識符。但我也想知道爲什麼你需要知道哪個進程正在返回哪個值。如果你真的需要了解這個過程,或者你需要關聯你的輸入列表和輸出列表?在這種情況下,我建議使用'multiprocessing.Pool.map'來處理你的工作項目清單。 – dano

36

我認爲@sega_sai提出的方法是更好的方法。但它確實需要一個代碼示例,所以這裏有雲:

import multiprocessing 
from os import getpid 

def worker(procnum): 
    print 'I am number %d in process %d' % (procnum, getpid()) 
    return getpid() 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes = 3) 
    print pool.map(worker, range(5)) 

,它將打印返回值:

I am number 0 in process 19139 
I am number 1 in process 19138 
I am number 2 in process 19140 
I am number 3 in process 19139 
I am number 4 in process 19140 
[19139, 19138, 19140, 19139, 19140] 

如果您熟悉map(Python的內置),這不應該是太具挑戰性。否則看看sega_Sai's link

請注意,只需要很少的代碼。 (還請注意過程如何重用)。

+0

任何想法爲什麼我的'getpid()'返回所有相同的值?我正在運行Python3 – zelusp

+0

我不確定池如何將工作分配給工作人員。如果他們真的很快,也許他們最終都會在同一個員工身上?它是否一致發生?另外,如果你添加一個延遲? – Mark

+0

我也認爲這是一個速度相關的東西,但是當我使用超過10個進程提供的'pool.map'範圍爲1,000,000時,我最多看到兩個不同的pid。 – zelusp

5

您可以使用內置的exit來設置進程的退出代碼。它可以從exitcode屬性的過程中獲得:

import multiprocessing 

def worker(procnum): 
    print str(procnum) + ' represent!' 
    exit(procnum) 

if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    result = [] 
    for proc in jobs: 
     proc.join() 
     result.append(proc.exitcode) 
    print result 

輸出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[0, 1, 2, 3, 4] 
+2

被警告,這種方法可能會變得混亂。一般情況下,進程應該退出,退出代碼爲0,完成後沒有錯誤。如果您有任何監控您的系統過程退出代碼,那麼您可能會將這些報告視爲錯誤。 – ferrouswheel

3

爲別人誰是尋求如何使用Queue獲得從Process值:

import multiprocessing 

ret = {'foo': False} 

def worker(queue): 
    ret = queue.get() 
    ret['foo'] = True 
    queue.put(ret) 

if __name__ == '__main__': 
    queue = multiprocessing.Queue() 
    queue.put(ret) 
    p = multiprocessing.Process(target=worker, args=(queue,)) 
    p.start() 
    print queue.get() # Prints {"foo": True} 
    p.join() 
+0

當我在我的工作進程中放入某個隊列時,我的連接永遠不會到達。任何想法可能會如何? –

+0

@LaurensKoppenol你的意思是說你的主代碼永久掛在p.join()上,永遠不會繼續?你的過程是否有無限循環? –

+3

是的,它無限地掛在那裏。我的工作人員全部完成(在工作人員功能結束後循環,之後打印聲明打印,供所有工作人員使用)。連接不做任何事情。如果我從我的函數中刪除'Queue',它確實允許我通過'join()' –

7

這個例子說明如何使用multiprocessing.Pipe實例的列表s至從進程的任意數量的返回字符串:

​​

輸出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!'] 

此解決方案使用更少的資源比使用

或者使用

它是非常有啓發性看源每種類型。

+0

如果不將管道設爲全局變量,最好的方法是什麼? – Nickpick

+0

我把所有的全局數據和代碼放到一個主函數中,它的工作原理是一樣的。這是否回答你的問題? –

+0

在添加(發送)任何新值之前,總是必須讀取管道? – Nickpick

0

我修改vartec的回答有點,因爲我需要得到從函數的錯誤代碼。 (感謝vertec !!!它的一個可怕的把戲)

這也可以用manager.list來完成,但我認爲最好是在一個字典中存儲它並存儲一個列表。這樣,我們保留函數和結果的方式,因爲我們無法確定列表將被填充的順序。

from multiprocessing import Process 
import time 
import datetime 
import multiprocessing 


def func1(fn, m_list): 
    print 'func1: starting' 
    time.sleep(1) 
    m_list[fn] = "this is the first function" 
    print 'func1: finishing' 
    # return "func1" # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list): 
    print 'func2: starting' 
    time.sleep(3) 
    m_list[fn] = "this is function 2" 
    print 'func2: finishing' 
    # return "func2" 

def func3(fn, m_list): 
    print 'func3: starting' 
    time.sleep(9) 
    # if fail wont join the rest because it never populate the dict 
    # or do a try/except to get something in return. 
    raise ValueError("failed here") 
    # if we want to get the error in the manager dict we can catch the error 
    try: 
     raise ValueError("failed here") 
     m_list[fn] = "this is third" 
    except: 
     m_list[fn] = "this is third and it fail horrible" 
     # print 'func3: finishing' 
     # return "func3" 


def runInParallel(*fns): # * is to accept any input in list 
    start_time = datetime.datetime.now() 
    proc = [] 
    manager = multiprocessing.Manager() 
    m_list = manager.dict() 
    for fn in fns: 
     # print fn 
     # print dir(fn) 
     p = Process(target=fn, name=fn.func_name, args=(fn, m_list)) 
     p.start() 
     proc.append(p) 
    for p in proc: 
     p.join() # 5 is the time out 

    print datetime.datetime.now() - start_time 
    return m_list, proc 

if __name__ == '__main__': 
    manager, proc = runInParallel(func1, func2, func3) 
    # print dir(proc[0]) 
    # print proc[0]._name 
    # print proc[0].name 
    # print proc[0].exitcode 

    # here you can check what did fail 
    for i in proc: 
     print i.name, i.exitcode # name was set up in the Process line 53 

    # here will only show the function that worked and where able to populate the 
    # manager dict 
    for i, j in manager.items(): 
     print dir(i) # things you can do to the function 
     print i, j 
1

出於某種原因,我無法找到如何與Queue任何地方做一個普通的例子(甚至Python的文檔例子不產卵多個進程),所以這裏是我得到的後像10次嘗試工作:

def add_helper(queue, arg1, arg2): # the func called in child processes 
    ret = arg1 + arg2 
    queue.put(ret) 

def multi_add(): # spawns child processes 
    q = Queue() 
    processes = [] 
    rets = [] 
    for _ in range(0, 100): 
     p = Process(target=add_helper, args=(q, 1, 2)) 
     processes.append(p) 
     p.start() 
    for p in processes: 
     ret = q.get() # will block 
     rets.append(ret) 
    for p in processes: 
     p.join() 
    return rets 

Queue是阻塞,線程安全的隊列中,你可以用它來返回值從子進程存儲。所以你必須將隊列傳遞給每個進程。一些不太明顯的是,你們從隊列中有get()joinProcess ES否則隊列已滿,並且塊之前的一切。

更新爲那些誰是面向對象(在Python 3進行測試。4):

from multiprocessing import Process, Queue 

class Multiprocessor(): 

    def __init__(self): 
     self.processes = [] 
     self.queue = Queue() 

    @staticmethod 
    def _wrapper(func, queue, args, kwargs): 
     ret = func(*args, **kwargs) 
     queue.put(ret) 

    def run(self, func, *args, **kwargs): 
     args2 = [func, self.queue, args, kwargs] 
     p = Process(target=self._wrapper, args=args2) 
     self.processes.append(p) 
     p.start() 

    def wait(self): 
     rets = [] 
     for p in self.processes: 
      ret = self.queue.get() 
      rets.append(ret) 
     for p in self.processes: 
      p.join() 
     return rets 

# tester 
if __name__ == "__main__": 
    mp = Multiprocessor() 
    num_proc = 64 
    for _ in range(num_proc): # queue up multiple tasks running `sum` 
     mp.run(sum, [1, 2, 3, 4, 5]) 
    ret = mp.wait() # get all results 
    print(ret) 
    assert len(ret) == num_proc and all(r == 15 for r in ret) 
相關問題