2011-07-12 56 views
10

我想在Python中使用多處理庫中的隊列。在執行下面的代碼之後(打印語句正常工作),但在我調用加入隊列並且仍然存在之後,進程不會退出。我怎樣才能終止剩餘的進程?Python中的多處理隊列

謝謝!

def MultiprocessTest(self): 
    print "Starting multiprocess." 
    print "Number of CPUs",multiprocessing.cpu_count() 

    num_procs = 4 
    def do_work(message): 
    print "work",message ,"completed" 

    def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

    q = multiprocessing.JoinableQueue() 
    for i in range(num_procs): 
    p = multiprocessing.Process(target=worker) 
    p.daemon = True 
    p.start() 

    source = ['hi','there','how','are','you','doing'] 
    for item in source: 
    q.put(item) 
    print "q close" 
    q.join() 
    #q.close() 
    print "Finished everything...." 
    print "num active children:",multiprocessing.active_children() 

回答

7

試試這個:

import multiprocessing 

num_procs = 4 
def do_work(message): 
    print "work",message ,"completed" 

def worker(): 
    for item in iter(q.get, None): 
    do_work(item) 
    q.task_done() 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    procs.append(multiprocessing.Process(target=worker)) 
    procs[-1].daemon = True 
    procs[-1].start() 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

q.join() 

for p in procs: 
    q.put(None) 

q.join() 

for p in procs: 
    p.join() 

print "Finished everything...." 
print "num active children:", multiprocessing.active_children() 
+0

你有沒有任何理由在完成後將None放入隊列中?我認爲task_done()可以幫助避免這個問題?我試圖在本頁面底部的示例之後對代碼進行建模:http://docs.python.org/library/queue.html – aerain

+0

這實際上不工作:( – aerain

+0

未對解決方案進行評級,但暗示如何讓它運行:在def worker的第一次使用之前移動「q =」聲明行()... ;-) – Dilettant

3

在參加該過程之前清除隊列,但q.empty()是不可靠的。

清除隊列的最好方法是統計成功獲取或循環的次數,直到您收到一個sentinel值,就像擁有可靠網絡的套接字一樣。

6

你的工作人員需要一個哨兵終止,否則他們只會坐在封鎖的閱讀。需要注意的是使用在Q睡眠,而不是加入在P,可以顯示狀態信息等
我的最佳模板是:

def worker(q,nameStr): 
    print 'Worker %s started' %nameStr 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 
    q.task_done() 
    print 'Worker %s Finished' % nameStr 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    nameStr = 'Worker_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    p.daemon = True 
    p.start() 
    procs.append(p) 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

for i in range(num_procs): 
    q.put(None) # send termination sentinel, one for each process 

while not q.empty(): # wait for processing to finish 
    sleep(1) # manage timeouts and status updates etc. 
+1

雖然不是q.empty(),但是隻有當工作人員抓住要完成的最後一項工作時,才能知道處理已完成。坦率地說,如果你不恰當地使用JoinableQueue,你不需要一個JoinableQueue。如果您選擇不使用某個工作線程,則不需要工作線程標記task_done。使用這樣一個隊列的目的是讓你可以加入它,這正是你在這個程序結束時所要做的,而不是等待隊列爲空。 – leetNightshade

+0

是的,用這種方法,工作提前結束。 – Forethinker

1

下面的代碼可能不是很相關,但我將它張貼您的意見/反饋,所以我們可以一起學習。謝謝!

import multiprocessing 

def boss(q,nameStr): 
    source = range(1024) 
    for item in source: 
    q.put(nameStr+' '+str(item)) 
    q.put(None) # send termination sentinel, one for each process 

def worker(q,nameStr): 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 

q = multiprocessing.Queue() 

procs = [] 

num_procs = 4 
for i in range(num_procs): 
    nameStr = 'ID_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    procs.append(p) 
    p = multiprocessing.Process(target=boss, args=(q,nameStr)) 
    procs.append(p) 

for j in procs: 
    j.start() 
for j in procs: 
    j.join() 
0

這裏是你把一些任務上JoinableQueue,然後啓動消耗的任務和退出,一旦他們閱讀的隊列「幹」的工作進程比較簡單的情況下,前哨免費方法。訣竅是使用JoinableQueue.get_nowait()而不是get()。顧名思義,get_nowait()試圖以非阻塞的方式從隊列中獲取值,並且如果沒有什麼可以獲得,則會引發一個queue.Empty異常。工人通過退出來處理這個異常。

基本的代碼來說明一個道理:

import multiprocessing as mp 
from queue import Empty 

def worker(q): 
    while True: 
    try: 
     work = q.get_nowait() 
     # ... do something with `work` 
     q.task_done() 
    except Empty: 
     break # completely done 

# main 
worknum = 4 
jq = mp.JoinableQueue() 

# fill up the task queue 
# let's assume `tasks` contains some sort of data 
# that your workers know how to process 
for task in tasks: 
    jq.put(task) 

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ] 
for p in procs: 
    p.start() 

for p in procs: 
    p.join() 

的好處是,你不需要把「毒丸」上的隊列,以便代碼是有點短。

重要:在生產者和消費者使用相同的隊列中「交織」的方式與工人可能要等待新的任務來沿着更復雜的情況下,「毒丸」的做法應該是用過的。我上面的建議是針對工人「知道」如果任務隊列是空的簡單情況,那麼再沒有任何問題了。