2017-11-04 78 views
1
#Version1 
main_df = pd.read_csv('Million_rows.csv') 
def myfunction(args*,start,end): 
    for i in range(start,end): 
    if condition1: 
     for item in mainTreeSearch: 
      ... 
       lock.acquire() 
       ###write to main_df 
       lock.release() 
       noLuck = False 
       break 
     if noLuck and Acondition: 
      lock.acquire() 
      ###write to main_df 
      lock.release() 
    elif 
    ... various asymmetric decision trees... 

t1 = Thread(target=myfuct, args=(args*),0,250)) 
t2 = Thread(target=myfuct, args=(args*),250,500)) 
t3 = Thread(target=myfuct, args=(args*),500,750)) 
t4 = Thread(target=myfuct, args=(args*),750,1000)) 

我的問題是,我不知道如何餵食其餘行的線程,我試過Queue,失敗。如何在線程之間進行通信以請求下一個塊?

#Version2 
def myfuntion(args*,q) 
    while True: 
    q.get() 
    ....same search as above...without locking 
    q.task_done() 

q = Queue(maxsize=0) 
num_threads = 5 
threads =[] 
for i in range(num_threads): 
    worker = Thread(target=myfunction, args=(args*)) 
    worker.setDaemon(True) 
    threads.append(worker) 
    worker.start() 

for x in range(1000): 
    #time.sleep(.005) 
    q.put(x) 

q.join() 

在版本2中,而不sleep任1個螺紋豬的所有數據或隨機崩潰發生。

在版本1中,我應該使用threading.nodify()機制,如果是這樣,它是如何實現的?

回答

0

我把它格式化這個和它的作品如預期

from Queue import Queue 
import threading 

q = Queue() 


def myfuntion(q): 
    while True: 
     val = q.get() 
     print('\n' + str(threading.currentThread())) 
     print('\n' + str(val)) 
     q.task_done() 


num_threads = 5 
threads = [] 
for i in range(num_threads): 
    worker = threading.Thread(target=myfuntion, args=(q,)) 
    worker.setDaemon(True) 
    threads.append(worker) 
    worker.start() 

for x in range(1000): 
    q.put(x) 

q.join() 

檢查出來。我認爲你傳遞參數的方式是錯誤的。

+0

謝謝你的努力!但是,我也是從上述骨架開始的,排隊機制無法始終如一地工作,只是增加延遲似乎會使其穩定下來。 (我對不明確的道歉,但參數是佔位符在這個片段,而不是實際的格式) – user8588756

+0

你能詳細說明你的需求嗎? – csurfer

相關問題