2010-07-20 123 views
140

當我運行類似:多處理:如何在類中定義的函數上使用Pool.map?

from multiprocessing import Pool 

p = Pool(5) 
def f(x): 
    return x*x 

p.map(f, [1,2,3]) 

它工作正常。但是,把這個作爲一類功能:

class calculate(object): 
    def run(self): 
     def f(x): 
      return x*x 

     p = Pool() 
     return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 

使我有以下錯誤:

Exception in thread Thread-1: 
Traceback (most recent call last): 
    File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner 
    self.run() 
    File "/sw/lib/python2.6/threading.py", line 484, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我已經看到亞歷克斯·馬爾泰利處理同類問題的帖子,但它不夠明確。

+1

「這是一個類的功能」?你能發佈實際得到實際錯誤的代碼嗎?沒有實際的代碼,我們只能猜測你做錯了什麼。 – 2010-07-20 10:05:39

+1

@ S.Lott我發佈了代碼 – Mermoz 2010-07-20 12:12:38

+0

作爲一般性評論,存在比Python的標準pickle模塊更強大的pickling模塊(比如[picloud](https://pypi.python.org/pypi/cloud/2.7.2 )模塊中提到[這個答案](http://stackoverflow.com/a/16626757/2292832))。 – 2013-08-20 15:50:40

回答

57

我也很受限於什麼樣的函數pool.map可以接受。我寫了以下來繞過這個。它似乎工作,即使遞歸使用parmap。

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    [p.join() for p in proc] 
    return [p.recv() for (p,c) in pipe] 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+1

這對我來說非常好,謝謝。我發現了一個弱點:我嘗試在一些傳遞給defaultdict的函數上使用parmap,並再次得到PicklingError。我沒有找到解決辦法,我只是重寫我的代碼不使用defaultdict。 – sans 2011-07-08 23:41:22

+2

這並不在Python工作2.7.2(默認情況下,2011年6月12日,15點08分59秒)[MSC v.1500 32位(英特爾)]在Win32 – ubershmekel 2012-02-18 19:03:21

+3

這確實關於Python 2.7.3月1,2012工作,05:14:39。 這並不巨iterables工作 - >它會導致OSERROR:[錯誤24]打開的文件太多,由於它打開管道的數量。 – 2013-01-18 19:19:22

7

在類中定義的函數(即使在類內的函數內)也不會真正的醃製。然而,這個工程:

def f(x): 
    return x*x 

class calculate(object): 
    def run(self): 
     p = Pool() 
    return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 
+12

謝謝,但是我覺得在類之外定義函數有點髒。該類應該捆綁它實現給定任務所需的全部內容。 – Mermoz 2010-07-20 12:53:10

+3

@Memoz:「班級應該把所有需要的東西捆綁在一起」真的嗎?我找不到很多這樣的例子。大多數類依賴於其他類或函數。爲什麼調用類的依賴「髒」?依賴關係有什麼問題? – 2010-07-20 12:59:40

+0

那麼,函數不應該修改現有的類數據 - 因爲它會修改其他進程中的版本 - 所以它可能是一個靜態方法。您可以排序的醃製一個靜態方法:http://stackoverflow.com/questions/1914261/pickling-a-staticmethod-in-python/1914798#1914798 或者,什麼這個簡單,你可以使用lambda。 – robert 2010-07-20 15:22:33

39

目前還沒有解決您的問題,據我知道:你給map()功能必須通過模塊的進口訪問。這就是爲什麼羅伯特的代碼工作:f()可以通過導入下面的代碼獲得功能:

def f(x): 
    return x*x 

class Calculate(object): 
    def run(self): 
     p = Pool() 
     return p.map(f, [1,2,3]) 

if __name__ == '__main__': 
    cl = Calculate() 
    print cl.run() 

其實我增加了一個「主要」部分,因爲這是繼recommendations for the Windows platform(「確保主模塊可由新的Python解釋器安全地導入,而不會造成意想不到的副作用「)。

我還在Calculate前加了一個大寫字母,以便跟在PEP 8之後。 :)

12

我也一直在努力。我有用作類的數據成員,作爲一個簡化的例子:

from multiprocessing import Pool 
import itertools 
pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # Needed to do something like this (the following line won't work) 
     return pool.map(self.f,list1,list2) 

我需要在同一類內使用功能self.f在Pool.map()調用從和self.f沒以一個元組爲參數。由於這個函數被嵌入到一個類中,所以我不清楚如何編寫包裝類型的其他答案。

我解決了這個問題,它使用了一個不同的包裝器,它使用一個元組/列表,其中第一個元素是該函數,其餘元素是該函數的參數,稱爲eval_func_tuple(f_args)。使用這個,有問題的行可以被替換爲返回pool.map(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2))。下面是完整的代碼:

文件:util.py

def add(a, b): return a+b 

def eval_func_tuple(f_args): 
    """Takes a tuple of a function and args, evaluates and returns result""" 
    return f_args[0](*f_args[1:]) 

文件:main.py

from multiprocessing import Pool 
import itertools 
import util 

pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # The following line will now work 
     return pool.map(util.eval_func_tuple, 
      itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__': 
    myExample = Example(util.add) 
    list1 = [1, 2, 3] 
    list2 = [10, 20, 30] 
    print myExample.add_lists(list1, list2) 

運行main.py會給[11,22,33]。隨意改進這一點,例如eval_func_tuple也可以修改爲採用關鍵字參數。

另一個說明,在另一個答案中,功能「parmap」可以更有效地處理比可用CPU數量更多的進程。我正在複製下面的編輯版本。這是我的第一篇文章,我不確定是否應該直接編輯原始答案。我也重命名了一些變量。

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    numProcesses = len(processes) 
    processNum = 0 
    outputList = [] 
    while processNum < numProcesses: 
     endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) 
     for proc in processes[processNum:endProcessNum]: 
      proc.start() 
     for proc in processes[processNum:endProcessNum]: 
      proc.join() 
     for proc,c in pipe[processNum:endProcessNum]: 
      outputList.append(proc.recv()) 
     processNum = endProcessNum 
    return outputList  

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5))   
17

通過mrule該解決方案是正確的,但有一個錯誤:如果孩子發回大量數據,它可以填補管道緩衝區,阻止孩子的pipe.send(),而家長在等待孩子退出pipe.join()。解決方案是在兒童讀取兒童數據之前。此外,孩子應該關閉父母的管道末端以防止死鎖。下面的代碼解決了這個問題。另請注意,此parmap會在X中爲每個元素創建一個進程。更高級的解決方案是使用multiprocessing.cpu_count()X分成多個塊,然後在返回之前合併結果。我把這個作爲練習留給讀者,以免破壞mrule的簡潔性。 ;)

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(ppipe, cpipe,x): 
     ppipe.close() 
     cpipe.send(f(x)) 
     cpipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    ret = [p.recv() for (p,c) in pipe] 
    [p.join() for p in proc] 
    return ret 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+0

你如何選擇進程數量? – 2016-04-27 13:39:52

+0

它工作!謝謝。沒有其他解決方案適用於我:D – 2016-04-27 14:36:18

+0

但是,由於錯誤「OSError:[Errno 24]太多打開的文件」,它會很快死亡。我認爲需要對進程數量進行某種限制才能正常工作...... – 2016-04-27 15:01:04

69

,因爲使用「multiprocessing.Pool」不lambda表達式的工作代碼的代碼,而不是使用「multiprocessing.Pool」因爲有產卵多達過程我不能使用至今發佈的代碼工作項目。

我改編了代碼s.t.它會產生預定義數量的工作人員,並且只會在輸入列表中迭代(如果存在空閒工作人員)。我還爲員工提供了「守護進程」模式。 ctrl-c按預期工作。

import multiprocessing 


def fun(f, q_in, q_out): 
    while True: 
     i, x = q_in.get() 
     if i is None: 
      break 
     q_out.put((i, f(x))) 


def parmap(f, X, nprocs=multiprocessing.cpu_count()): 
    q_in = multiprocessing.Queue(1) 
    q_out = multiprocessing.Queue() 

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) 
      for _ in range(nprocs)] 
    for p in proc: 
     p.daemon = True 
     p.start() 

    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [q_in.put((None, None)) for _ in range(nprocs)] 
    res = [q_out.get() for _ in range(len(sent))] 

    [p.join() for p in proc] 

    return [x for i, x in sorted(res)] 


if __name__ == '__main__': 
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8])) 
+2

如何獲得進度條以正確使用此parmap功能? – shockburner 2014-07-19 00:33:36

+2

一個問題 - 我使用了這個解決方案,但注意到我產生的python進程在內存中保持活動狀態。任何快速思考如何在你的parmap退出時殺死那些人? – CompEcon 2014-11-15 13:19:15

+1

@ klaus-se我知道我們不鼓勵在評論中表示感謝,但您的回答對我來說太重要了,我無法抗拒。我希望我可以給你的不僅僅是一個聲譽更多... – deshtop 2015-07-30 17:58:05

34

除非您跳出標準庫,否則多處理和酸洗會受到破壞和限制。

如果使用名爲pathos.multiprocesssingmultiprocessing的分支,可以在多處理的map函數中直接使用類和類方法。這是因爲使用了dill而不是picklecPickledill可以序列化幾乎所有的python。

pathos.multiprocessing還提供了異步映射功能......它可以map功能與多個參數(如map(math.pow, [1,2,3], [4,5,6])

見討論: What can multiprocessing and dill do together?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至處理你最初編寫的代碼,不需要修改,也可以來自口譯員。爲什麼還有其他更脆弱和更具體的案例呢?

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> class calculate(object): 
... def run(self): 
... def f(x): 
... return x*x 
... p = Pool() 
... return p.map(f, [1,2,3]) 
... 
>>> cl = calculate() 
>>> print cl.run() 
[1, 4, 9] 

獲得此代碼: https://github.com/uqfoundation/pathos

而且,只是爲了炫耀多一點的可以做什麼:

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> 
>>> p = Pool(4) 
>>> 
>>> def add(x,y): 
... return x+y 
... 
>>> x = [0,1,2,3] 
>>> y = [4,5,6,7] 
>>> 
>>> p.map(add, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> 
>>> p.map(Test.plus, [t]*4, x, y) 
[4, 6, 8, 10] 
>>> 
>>> res = p.amap(t.plus, x, y) 
>>> res.get() 
[4, 6, 8, 10] 
+0

pathos.multiprocessing也有一個異步映射('amap'),可以使用進度條和其他異步編程。 – 2014-04-15 14:05:36

+0

我喜歡pathos.multiprocessing,它可以在享受多處理的同時幾乎爲非平行地圖提供一個直接替換。我有pathos.multiprocessing.map的一個簡單的包裝,使得它更內存效率時處理只讀大型數據在多個內核結構,請參見[本git倉庫(https://github.com/fashandge/ python_parmap)。 – Fashandge 2014-12-29 02:50:03

+0

似乎很有趣,但它不會安裝。這是pip給出的信息:'無法找到滿足要求的版本pp == 1.5.7-pathos(來自病態)' – xApple 2016-05-18 14:52:01

3

我修改克勞斯本身的方法,因爲儘管這是工作對於我的小列表,當項目數量大於或等於1000時,它會掛起。我不是一次一個地用None停止條件推送一個作業,而是一次加載輸入隊列,只讓進程在其上進行咬合,直到它爲空。

from multiprocessing import cpu_count, Queue, Process 

def apply_func(f, q_in, q_out): 
    while not q_in.empty(): 
     i, x = q_in.get() 
     q_out.put((i, f(x))) 

# map a function using a pool of processes 
def parmap(f, X, nprocs = cpu_count()): 
    q_in, q_out = Queue(), Queue() 
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)] 
    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [p.start() for p in proc] 
    res = [q_out.get() for _ in sent] 
    [p.join() for p in proc] 

    return [x for i,x in sorted(res)] 

編輯:可惜現在我遇到了我的系統上這個錯誤:Multiprocessing Queue maxsize limit is 32767,希望變通辦法將有幫助。

5

我採取了klaus se's和aganders3的答案,並且製作了一個更具可讀性的文檔化模塊,並將其保存在一個文件中。您可以將其添加到您的項目中。它甚至有一個可選的進度條!

""" 
The ``processes`` module provides some convenience functions 
for using parallel processes in python. 

Adapted from http://stackoverflow.com/a/16071616/287297 

Example usage: 

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) 

Comments: 

"It spawns a predefined amount of workers and only iterates through the input list 
if there exists an idle worker. I also enabled the "daemon" mode for the workers so 
that KeyboardInterupt works as expected." 

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. 

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess 
""" 

# Modules # 
import multiprocessing 
from tqdm import tqdm 

################################################################################ 
def apply_function(func_to_apply, queue_in, queue_out): 
    while not queue_in.empty(): 
     num, obj = queue_in.get() 
     queue_out.put((num, func_to_apply(obj))) 

################################################################################ 
def prll_map(func_to_apply, items, cpus=None, verbose=False): 
    # Number of processes to use # 
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) 
    # Create queues # 
    q_in = multiprocessing.Queue() 
    q_out = multiprocessing.Queue() 
    # Process list # 
    new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) 
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] 
    # Put all the items (objects) in the queue # 
    sent = [q_in.put((i, x)) for i, x in enumerate(items)] 
    # Start them all # 
    for proc in processes: 
     proc.daemon = True 
     proc.start() 
    # Display progress bar or not # 
    if verbose: 
     results = [q_out.get() for x in tqdm(range(len(sent)))] 
    else: 
     results = [q_out.get() for x in range(len(sent))] 
    # Wait for them to finish # 
    for proc in processes: proc.join() 
    # Return results # 
    return [x for i, x in sorted(results)] 

################################################################################ 
def test(): 
    def slow_square(x): 
     import time 
     time.sleep(2) 
     return x**2 
    objs = range(20) 
    squares = prll_map(slow_square, objs, 4, verbose=True) 
    print "Result: %s" % squares 

編輯:添加@亞歷山大 - 麥克法蘭的建議和測試功能

+0

您的進度條有一個問題...該條只能衡量工作負載在處理器中的分配效率如何。如果工作負載完全分離,那麼所有處理器將同時加入(),並且只會在'tqdm'顯示中完成'100%'的閃光。只有每個處理器有偏差的工作負載 – 2016-06-30 21:17:23

+1

移動'tqdm()'來包裝行:'result = [q_out.get()for _ in tqdm(sent)]'並且它工作很多更好 - 很大的努力,雖然真的很感謝這個+1 – 2016-06-30 21:22:01

+0

感謝您的建議,我會嘗試一下,然後更新答案! – xApple 2016-07-02 10:49:56

0

我不知道,如果這種方法被徵用,但一個工作,我周圍正在使用的是:

from multiprocessing import Pool 

t = None 

def run(n): 
    return t.f(n) 

class Test(object): 
    def __init__(self, number): 
     self.number = number 

    def f(self, x): 
     print x * self.number 

    def pool(self): 
     pool = Pool(2) 
     pool.map(run, range(10)) 

if __name__ == '__main__': 
    t = Test(9) 
    t.pool() 
    pool = Pool(2) 
    pool.map(run, range(10)) 

輸出應該是:

0 
9 
18 
27 
36 
45 
54 
63 
72 
81 
0 
9 
18 
27 
36 
45 
54 
63 
72 
81 
0
class Calculate(object): 
    # Your instance method to be executed 
    def f(self, x, y): 
    return x*y 

if __name__ == '__main__': 
    inp_list = [1,2,3] 
    y = 2 
    cal_obj = Calculate() 
    pool = Pool(2) 
    results = pool.map(lambda x: cal_obj.f(x, y), inp_list) 

您可能希望將這個函數應用於每個不同類的實例。然後這裏是也

class Calculate(object): 
    # Your instance method to be executed 
    def __init__(self, x): 
    self.x = x 

    def f(self, y): 
    return self.x*y 

if __name__ == '__main__': 
    inp_list = [Calculate(i) for i in range(3)] 
    y = 2 
    pool = Pool(2) 
    results = pool.map(lambda x: x.f(y), inp_list) 
3

我知道這是在6年前問了,但只是想補充我的解決方案,因爲一些上述建議看起來可怕複雜的解決方案,但我的解決辦法其實很簡單。

我只需要將pool.map()調用包裝到一個輔助函數中。將類對象與args一起作爲元組傳遞給方法,看起來有點像這樣。

def run_in_parallel(args): 
    return args[0].method(args[1]) 

myclass = MyClass() 
method_args = [1,2,3,4,5,6] 
args_map = [ (myclass, arg) for arg in method_args ] 
pool = Pool() 
pool.map(run_in_parallel, args_map) 
0

這是我的解決方案,我覺得這裏的解決方案比其他大多數人都少。這與睡衣的答案類似。

someclasses = [MyClass(), MyClass(), MyClass()] 

def method_caller(some_object, some_method='the method'): 
    return getattr(some_object, some_method)() 

othermethod = partial(method_caller, some_method='othermethod') 

with Pool(6) as pool: 
    result = pool.map(othermethod, someclasses) 
相關問題