2015-04-06 58 views
1

我在Python(3.4)中嘗試使用多線程,並對以下代碼有疑問。調用task_done()時發生死鎖,因爲任務已被新線程拉動

當我的工作量大於NUM_WORKER_THREADS時,代碼工作正常;然而,一旦隊列縮小到小於NUM_WORKER_THREADS計數,由於items.get()task_done調用之間的時間,新的迭代可能會採用相同的項目。當調用task_done時,這會導致死鎖。

處理這個問題的正確方法是什麼?

import time 
import threading 

from queue import Queue 
NUM_WORKER_THREADS = 8 

def worker(): 
    try: 
     while items.qsize() > 0: 
      print("{} items left to process".format(items.qsize())) 
      item = items.get()     
      print("Processing {}".format(item)) 
      itemrec = getItemRecord(item) # external call to webservice ~3 second response.    
      items.task_done() 

    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst) 


# start counter to monitor performance 
start = time.perf_counter() 

items = Queue() 
# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 

# add results of search to a collection 
for itemid in searchResults: 
    if itemid['recordtype'] == 'inventoryitem': 
     items.put(itemid['id']) 

for i in range(NUM_WORKER_THREADS): 
    try: 
     t = threading.Thread(target=worker) 
     t.daemon = True 
     t.start() 
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst)    

items.join() 

# print end of execution performance counter 
print('time:', time.perf_counter() - start) 

回答

2

我會用一個哨兵告訴工人,關閉時,有沒有更多的工作項目來處理,而不是依靠Queue大小,這是容易受到競爭條件:

import time 
import threading 

from queue import Queue 
NUM_WORKER_THREADS = 8 

def worker(): 
    for item in iter(items.get, None): 
     try: 
      print("{} items left to process".format(items.qsize())) 
      print("Processing {}".format(item)) 
     except Exception as inst: 
      print("---------------EXCEPTION OCCURRED----------------") 
      print(type(inst)) 
      print(inst.args) 
      print(inst) 
     finally: 
      items.task_done() 
    print("Got sentinel, shut down") 
    items.task_done() 

# start counter to monitor performance 
start = time.perf_counter() 

items = Queue() 
# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 

# add results of search to a collection 
for itemid in searchResults: 
    if itemid['recordtype'] == 'inventoryitem': 
     items.put(itemid['id']) 

for _ in range(NUM_WORKER_THREADS): 
    items.put(None) # Load a sentinel for each worker thread 

for i in range(NUM_WORKER_THREADS): 
    try: 
     t = threading.Thread(target=worker) 
     t.daemon = True 
     t.start() 
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst)    

items.join() 

# print end of execution performance counter 
print('time:', time.perf_counter() - start) 

另外請注意,您可以使用內置的被Python(multiprocessing.dummy.Pool)提供的線程池來這更優雅做:

import time 
from multiprocessing.dummy import Pool # Thread Pool 

NUM_WORKER_THREADS = 8 

def worker(item): 
    try: 
     print("Processing {}".format(item)) 
     itemrec = getItemRecord(item) # external call to webservice ~3 second response.    
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst) 

# start counter to monitor performance 
start = time.perf_counter() 

# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 
pool = Pool(NUM_WORKER_THREADS) 
pool.map(worker, [item['id'] for item in searchResults 
        if item['recordtype'] == 'inventoryitem']) 
pool.close() 
pool.join() 


# print end of execution performance counter 
print('time:', time.perf_counter() - start) 
+0

謝謝 - 我給一個旋轉。 –