2013-03-20 124 views
6

我知道Stack Exchange上有很多帖子與將多處理結果寫入單個文件相關,並且在閱讀完這些帖子後我開發了自己的代碼。我試圖實現的是並行運行「RevMapCoord」函數,並使用multiprocess.queue將其結果寫入單個文件中。但是我排隊工作時遇到問題。我的代碼:使用隊列寫入同一文件的Python多重處理

def RevMapCoord(list): 
    "Read a file, Find String and Do something" 

def feed(queue, parlist): 
    for par in parlist: 
     print ('Echo from Feeder: %s' % (par)) 
     queue.put(par) 
    print ('**Feeder finished queing**') 

def calc(queueIn, queueOut): 
    print ('Worker function started') 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      res = RevMapCoord(final_res) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 


feedProc = Process(target = feed , args = (workerQueue, final_res)) 
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)] 
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno)) 

feedProc.start() 
print ('Feeder is joining') 
feedProc.join() 
for p in calcProc: 
    p.start() 
for p in calcProc: 
    p.join() 
writProc.start() 
writProc.join() 

當我運行這個代碼腳本在「feedProc.start()」步驟時,從屏幕上的最後幾行輸出顯示打印語句從結束「feedProc.start()」:

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne 
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine 
**Feeder finished queing** 

但在執行下一行「feedProc.join()」前掛起。代碼沒有錯誤並繼續運行,但什麼都不做(掛起)。請告訴我我正在犯什麼錯誤。

回答

0

我實現寫作成績。從uing在Python3「map_async」功能多處理單個文件下面是我寫的函數:

def PPResults(module,alist):##Parallel processing 
    npool = Pool(int(nproc))  
    res = npool.map_async(module, alist) 
    results = (res.get())###results returned in form of a list 
    return results 

所以,我提供的「的a_list」和「模塊」參數列表該功能是在函數進行處理並返回結果。上述函數繼續以列表的形式收集結果,並在所有段落返回時返回來自'a_list'的米已經被處理。結果可能不是正確的順序,但順序對我來說並不重要,這很好。 「結果」列表中可以重複和單獨的結果寫在文件中,如:

fh_out = open('./TestResults', 'w') 
for i in results:##Write Results from list to file 
    fh_out.write(i) 

爲了保持到我在我的問題(上)中提到,我們可能需要使用「隊列」類似的結果的順序。雖然我可以修復代碼,但我相信這裏不需要提及。

感謝

AK

9

我認爲你應該將你的例子縮減爲基礎。例如:

from multiprocessing import Process, Queue 

def f(q): 
    q.put('Hello') 
    q.put('Bye') 
    q.put(None) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    with open('file.txt', 'w') as fp: 
     while True: 
      item = q.get() 
      print(item) 
      if item is None: 
       break 
      fp.write(item) 
    p.join() 

這裏我有兩個進程(主進程,一個p)。 p將字符串放入由主進程檢索的隊列中。當主進程仍然沒有找到(我使用指示哨兵:「我做了」它打破了環

擴展這一許多進程(或線程)是微不足道的

+2

你應該嘗試運行您的示例(它給出了一個錯誤)。這種方式不能在隊列中放置多個項目。實際上你只是把一個項目放在一個列表上。 – Gerrat 2013-05-08 17:27:22

+0

'TypeError:期望一個字符緩衝區對象'我有錯誤:| – nk9 2013-08-05 18:16:11

+1

@ b1- * new *(和正確的,謝謝Gerrat)版本與python 2.7.5和3.2.3一起使用。試一試! – Hernan 2013-08-07 02:08:05