2016-11-23 64 views
1

我看到某處如何處理大型數據集(說的文本行)與多快模塊暗示,像這樣:python3 multiprocessing.Process方法未能

... (form batch_set = nump batches [= lists of lines to process], batch_set 
    is a list of lists of strings (batches)) 
nump = len(batch_set) 
output = mp.Queue() 
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 

results = sorted([output.get() for p in processes]) 
... (do something with the processed outputs, ex print them in order, 
    given that each proc_lines function returns a couple (i, out_batch)) 

然而,當我運行代碼有少量的行/批次,它工作正常 [例如:'./code.py -x 4:10'爲nump = 4和numb = 10(行/批次)],而在 某些行/批處理掛起[例如:'./code.py -x 4:4000'],當我中斷它時,我看到一個關於_wait_for_tstate_lock和系統 線程庫的追蹤提示。看來,代碼沒有達到最後的代碼所示以上 行...

我提供了下面的代碼,如果有人需要它來回答這是爲什麼 發生,如何解決它。

#!/usr/bin/env python3 

import sys 
import multiprocessing as mp 


def fabl(numb, nump): 
    ''' 
    Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines 
    '<idx> my line here' with indexes from 1 to (nump x numb). 
    ''' 
    ret = [] 
    idx = 1 
    for _ in range(nump): 
     cb = [] 
     for _ in range(numb): 
      cb.append('%07d my line here' % idx) 
      idx += 1 
     ret.append(cb) 
    return ret 


def proc_lines(i, output, rows_in): 
    ret = [] 
    for row in rows_in: 
     row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part 
     ret.append(row) 

    output.put((i,ret)) 
    return 


def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    nump = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 

    print('waiting for procs to complete...') 
    results = sorted([output.get() for p in processes]) 
    return results 


def write_set(proc_batch_set, fout): 
    'write p[rocessed]batch_set' 
    for _, out_batch in proc_batch_set: 
     for row in out_batch: 
      fout.write(row) 
    return 


def main(): 
    args = sys.argv 
    if len(args) < 2: 
     print(''' 
    run with args: -x [ NumProc:BatchSize ] 
     (ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...)) 
     ''') 
     sys.exit(0) 

    numb = 10 # suppose we need this number of lines/batch : BatchSize 
    nump = 4 # number of processes to use.    : NumProcs 
    if len(args) > 2 and ':' in args[2]: # use another np:bs 
     nump, numb = map(int, args[2].split(':')) 

    batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines) 
    proc_batch_set = mp_proc(batch_set) 

    with open('out-min', 'wt') as fout: 
     write_set(proc_batch_set, fout) 

    return 

if __name__ == '__main__': 
    main() 

回答

1

Queue有一定的能力,如果你不清空,而Process運行能得到充分。這不會阻止您的流程執行,但如果put未完成,您將無法加入Process

所以我只是修改mp_proc功能使得:

def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    n_process = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) 
       for i in range(process)] 

    for p in processes: 
     p.start() 

    # Empty the queue while the processes are running so there is no 
    # issue with uncomplete `put` operations. 
    results = sorted([output.get() for p in processes]) 

    # Join the process to make sure everything finished correctly 
    for p in processes: 
     p.join() 

    return results 
+0

所以,基本上,這將意味着,我正在試圖通過過多添加到它,從而阻止系統濫用隊列。我發現這篇文章說的相同:http://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em,以便可能是問題。謝謝! – vuvu

+0

我用p.join()語句替換了一個稍大的塊,在p.start()之後,只要有足夠的項存儲在那裏,並且在最後,所有的子過程都已經完成,即使是非常大的批次,這個過程也是成功的。 – vuvu