2016-10-01 143 views
2

我想通過隊列使一個分析器多線程。它似乎工作,但我的隊列掛起。如果有人能告訴我如何解決這個問題,我會很感激,因爲我很少寫多線程代碼。Python中的線程隊列掛起

此代碼從Q寫着:

from silk import * 
import json 
import datetime 
import pandas 
import Queue 
from threading import Thread 

l = [] 
q = Queue.Queue() 

def parse_record(): 
    d = {} 
    while not q.empty(): 
     rec = q.get() 
     d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
     # ... many ops like this 
     d['dport'] = rec.dport 
     l.append(d) # l is global 

這充滿問:

def parse_records(): 
    ffile = '/tmp/query.rwf' 
    flows = SilkFile(ffile, READ) 
    numthreads = 2 

    # fill queue 
    for rec in flows: 
     q.put(rec) 
    # work on Queue  
    for i in range(numthreads): 
     t = Thread(target = parse_record) 
     t.daemon = True 
     t.start() 

    # blocking 
    q.join() 

    # never reached  
    data_df = pandas.DataFrame.from_records(l) 
    return data_df 

我只在我的主打電話parse_records()。它永遠不會終止。

+1

作爲一個方面說明,線程化可能會使其運行速度變慢。 python GIL只允許一次運行一個線程。 CPU綁定的工作人員不會並行運行。 – tdelaney

回答

2

Queue.empty doc說:

...如果是空的()返回FALSE,它並不能保證後續調用get()不會阻止。

至少應該使用get_nowait或風險數據丟失。但更重要的是,加入只會釋放時,所有排隊的項目都標記爲已完成了Queue.task_done電話:

如果join()方法是目前攔截,當所有項目已處理完畢,就會恢復(意思每個已放入隊列的項目都會收到一個task_done()調用。

作爲附註,l.append(d)不是原子的,應該用鎖保護。

from silk import * 
import json 
import datetime 
import pandas 
import Queue 
from threading import Thread, Lock 

l = [] 
l_lock = Lock() 
q = Queue.Queue() 

def parse_record(): 
    d = {} 
    while 1: 
     try: 
      rec = q.getnowait() 
      d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
      # ... many ops like this 
      d['dport'] = rec.dport 
      with l_lock(): 
       l.append(d) # l is global 
      q.task_done() 
     except Queue.Empty: 
      return 

通過使用標準庫中的線程池,可以大大縮短代碼。

from silk import * 
import json 
import datetime 
import pandas 
import multiprocessing.pool 

def parse_record(rec): 
    d = {} 
    d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
    # ... many ops like this 
    d['dport'] = rec.dport 
    return d 

def parse_records(): 
    ffile = '/tmp/query.rwf' 
    flows = SilkFile(ffile, READ) 
    pool = multiprocessing.pool.Pool(2) 
    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows) 
    pool.close() 
    return data_df 
+0

正確...這比較慢。但是我在Python中學到了很多關於多線程的知識。非常感謝你。 – wishi