2014-10-27 102 views
7

我有幾十萬個文本文件,我想以各種方式進行解析。我想將輸出保存到單個文件而沒有同步問題。我一直在使用多處理池來節省時間,但我無法弄清楚如何組合Pool和Queue。Python:在使用多處理時使用隊列寫入單個文件池

以下代碼將保存infile名稱以及文件中連續「x」的最大數目。但是,我希望所有進程都將結果保存到同一個文件中,而不是像我的示例中那樣保存到不同的文件中。任何幫助,將不勝感激。

import multiprocessing 

with open('infilenamess.txt') as f: 
    filenames = f.read().splitlines() 

def mp_worker(filename): 
with open(filename, 'r') as f: 
     text=f.read() 
     m=re.findall("x+", text) 
     count=len(max(m, key=len)) 
     outfile=open(filename+'_results.txt', 'a') 
     outfile.write(str(filename)+'|'+str(count)+'\n') 
     outfile.close() 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    p.map(mp_worker, filenames) 

if __name__ == '__main__': 
    mp_handler() 

回答

21

多處理池爲您實現隊列。只需使用一個將工作返回值返回給調用者的池方法即可。 imap運作良好:

import multiprocessing 
import re 

def mp_worker(filename): 
    with open(filename) as f: 
     text = f.read() 
    m = re.findall("x+", text) 
    count = len(max(m, key=len)) 
    return filename, count 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    with open('infilenamess.txt') as f: 
     filenames = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, filenames): 
      # (filename, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 
+0

因此,我循環遍歷一個結果,並將它們寫入文件,因爲它們進來?這是否意味着新員工在每個「結果」寫完之前都不會啓動,或者每次都運行,但是會等待寫入? 另外,你能解釋爲什麼你用[line for line in(l.strip()for f in line)替換我的f.read()。splitlines()如果line]? – risraelsen 2014-10-28 01:04:29

+0

32個進程在後臺運行,並在「塊」中獲取更多文件名,因爲它們會將結果傳遞迴父進程。結果會立即傳回,以便母公司並行執行其工作。一行一行地讀取文件比一次讀取文件並稍後拆分文件更有效率......這就是列表的目的。 – tdelaney 2014-10-28 02:15:08

+2

最佳答案/ example – zach 2014-12-11 20:46:09

3

我接受了接受的答案並簡化了它,以便我自己理解這是如何工作的。我在這裏發佈它,以便它可以幫助別人。

import multiprocessing 

def mp_worker(number): 
    number += 1 
    return number 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    numbers = list(range(1000)) 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, numbers): 
      f.write('%d\n' % result) 

if __name__=='__main__': 
    mp_handler()