2012-03-07 149 views
6

我對python相當陌生。 我正在使用多處理模塊讀取stdin上的文本行,以某種方式轉換它們並將它們寫入數據庫。下面是我的代碼片段:python pool apply_async和map_async不會阻塞完整隊列

batch = [] 
pool = multiprocessing.Pool(20) 
i = 0 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     pool.apply_async(insert, args=(batch,i+1)) 
     batch = [] 
pool.apply_async(insert, args=(batch,i)) 
pool.close() 
pool.join() 

現在,一切工作正常,直到我去處理龐大的輸入文件(億萬行),我管到我的Python程序。在某個時候,當我的數據庫變慢時,我發現內存已經滿了。

經過一番遊戲後,事實證明,pool.apply_async和pool.map_async永遠都不會阻塞,因此要處理的調用隊列變得越來越大。

什麼是正確的方法來解決我的問題?我期望一個可以設置的參數,一旦達到某個隊列長度,就會阻塞pool.apply_async調用。 Java中的AFAIR可以爲ThreadPoolExecutor提供一個具有固定長度的BlockingQueue。

謝謝!

+1

_「事實證明,pool.apply_async以及pool.map_async從來沒有阻止」 _ - 一切,我一直在尋找 – leon 2013-07-05 23:28:41

回答

2

apply_async返回AsyncResult對象,你可以wait上:

if len(batch) >= 10000: 
    r = pool.apply_async(insert, args=(batch, i+1)) 
    r.wait() 
    batch = [] 

但如果你想這樣做在一個更清潔的方式,你應該使用multiprocessing.Queue與10000 maxsize,並從中獲得一個Worker來自multiprocessing.Process的課程從這樣的隊列中獲取。

+1

以及等待的AsyncResult不利於我的問題是,在隊列游泳池變大。我想知道我是否可以控制池中內部隊列的大小? – konstantin 2012-03-07 13:46:41

+0

@konstantin:我不知道我明白了。在等待'AsyncResult'時,主進程無法填充下一批,對吧? – 2012-03-07 14:02:59

9

爲了萬一有人在這裏結束,這就是我解決問題的方法:我停止使用multiprocessing.Pool。這是我如何做到這一點現在:

#set amount of concurrent processes that insert db data 
processes = multiprocessing.cpu_count() * 2 

#setup batch queue 
queue = multiprocessing.Queue(processes * 2) 

#start processes 
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches  
batch=[] 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     queue.put((batch,i+1)) 
     batch = [] 
if batch: 
    queue.put((batch,i+1)) 

#stop processes using poison-pill 
for _ in range(processes): queue.put((None,None)) 

print "all done." 
在插入方法

每批的處理被包裹在一個循環,從隊列中拉出,直到其接收到毒藥:

while True: 
    batch, end = queue.get() 
    if not batch and not end: return #poison pill! complete! 
    [process the batch] 
print 'worker done.' 
+0

不錯的簡單例子。多處理池經常比它值得的更麻煩,特別是因爲創建你自己的進程池非常簡單。 – travc 2013-05-25 21:24:18

8

apply_async並且map_async功能的設計不會阻礙主流程。爲了做到這一點,Pool保持一個內部Queue不幸的是不可能改變大小。

問題可以解決的方法是通過使用Semaphore初始化您希望隊列的大小。在飼料池和工人完成任務之前,您獲取並釋放信號量。

下面是一個使用Python 2.6或更高版本的例子。

from threading import Semaphore 
from multiprocessing import Pool 

def task_wrapper(f): 
    """Python2 does not allow a callback for method raising exceptions, 
    this wrapper ensures the code run into the worker will be exception free. 

    """ 
    try: 
     return f() 
    except: 
     return None 

def TaskManager(object): 
    def __init__(self, processes, queue_size): 
     self.pool = Pool(processes=processes) 
     self.workers = Semaphore(processes + queue_size) 

    def new_task(self, f): 
     """Start a new task, blocks if queue is full.""" 
     self.workers.acquire() 
     self.pool.apply_async(task_wrapper, args=(f,), callback=self.task_done)) 

    def task_done(self): 
     """Called once task is done, releases the queue is blocked.""" 
     self.workers.release()