2015-10-14 68 views
-1

我正在使用python多進程庫來處理一組進程中的信息。這些過程還包含進一步劃分必須完成的工作量的過程。有一個Manager.Queue積累了所有消耗數據的進程的結果。Python進程在IO完成之前終止

在python腳本的主線程中。我試圖使用連接來阻塞主線程,直到我們可以合理確定所有子進程是否完成,然後將輸出寫入單個文件。但是,在所有數據寫入文件之前,系統會終止並關閉文件。

以下代碼是上述解決方案實現的簡化提取。 用於inQueues隊列: queue.join()

for p in processes: 
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file: 
    out_file.write("Algorithm,result\n") 
    while not out_queue.empty(): 
     res = out_queue.get() 
     out_file.write(res['algorithm'] + ","+res['result']+"\n") 
     out_queue.task_done() 
     time.sleep(0.05) 
    out_queue.join() 
    out_file.close() 

的out_queue.qsize()將打印過量的500個記錄可用,但是隻有100將被打印到該文件。 同樣在這一點上,如果500條記錄是系統生成的總數,我不能100%確定,但只是此時報告的數字。

如何確保將所有結果寫入results.csv文件?

+0

[QSIZE()](http://bugs.python.org/issue17985):「返回隊列的近似大小由於。多線程/多處理語義, 是不可靠的。「 – kay

+0

我知道,由qsize方法指示的隊列大小可能會發生變化,但代碼段是整個程序中從隊列中刪除的唯一部分,因此預計打印的記錄數不會小於隊列的大小(這是當前發生的)。 – kyleED

回答

0

不要等到所有的進程完成你消耗的數據之前,但處理在同一時間的數據,並記住其進程仍在運行:

processes = [] 

"""start processes and append them to processes""" 

while True: 
    try: 
     # get an item 
     item = queue.get(True, 0.5) 
    except Queue.Empty: 
     # no item received in half a second 
     if not processes: 
      # there are no more processes and nothing left to process 
      break 
     else: 
      proc_num = 0 
      while proc_num < len(processes): 
       process = processes[proc_num] 
       exit_code = process.poll() 
       if exit_code is None: 
        # process is still running, proceed to next 
        proc_num += 1 
       elif exit_code == 0: 
        # process ended gracefully, remove it from list 
        processes.pop(proc_num) 
       else: 
        # process ended with an error, what now? 
        raise Exception('Her last words were: "%r"' % exit_code) 
    else: 
     # got an item 
     """process item""" 

,不測試,如果processes是空的外Queue.Empty或您將有races

,不過也許你會更開心一higher level function

pool = multiprocessing.Pool(8) 
items = pool.map_async(producer_function, producer_arguments) 
for item in items: 
    """process item"""