2013-04-07 79 views
2

我試圖用流程編寫生產者消費者的簡單代碼。生產者是一個過程。對於消費者,我從Pool獲取進程。Python生產者/消費者與進程和池

from multiprocessing import Manager, Process, Pool 
from time import sleep 
def writer(queue): 
    for i in range(10): 
    queue.put(i) 
    print 'put 1 size now ',queue.qsize() 
    sleep(1) 

def reader(queue): 
    print 'in reader' 
    for i in range(10): 
    queue.get(1) 
    print 'got 1 size now ', queue.qsize() 

if __name__ == '__main__': 
    q = Manager().Queue() 
    p = Process(target=writer, args=(q,)) 
    p.start() 
    pool = Pool() 
    c = pool.apply_async(reader,q) 

但我發現了錯誤,

Process Process-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "pc.py", line 5, in writer 
    queue.put(i) 
    File "<string>", line 2, in put 
    File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod 
    conn.send((self._id, methodname, args, kwds)) 
IOError: [Errno 32] Broken pipe 

任何人都可以點我,我該怎麼錯在何處。

+0

我來到這裏尋找相同的答案,我仍然試圖找到一個很好的教程,所以如果我工作了,我將會回來。如果你找到一個好的資源,請告訴我。乾杯! – unixsnob 2013-04-09 10:18:50

+1

本教程對理解多處理有很大幫助http://pymotw.com/2/multiprocessing/communication.html – unixsnob 2013-04-12 13:51:39

回答

5

您好我找到了這個答案,

from multiprocessing import Manager, Process, Pool,Queue 
from Queue import Empty 

def writer(queue): 
    for i in range(10): 
    queue.put(i) 
    print 'put %i size now %i'%(i, queue.qsize()) 

def reader(id, queue): 
    for i in range(10): 
    try: 
     cnt = queue.get(1,1) 
     print '%i got %i size now %i'%(id, cnt, queue.qsize()) 
    except Empty: 
     pass 

class Managerss: 
    def __init__(self): 
    self.queue= Queue() 
    self.NUMBER_OF_PROCESSES = 3 

    def start(self): 
    self.p = Process(target=writer, args=(self.queue,)) 
    self.p.start() 
    self.workers = [Process(target=reader, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
    for w in self.workers: 
     w.start() 

    def join(self): 
    self.p.join() 
    for w in self.workers: 
     w.join() 

if __name__ == '__main__': 
    m= Managerss() 
    m.start() 
    m.join() 

希望它可以幫助