2015-04-02 154 views
2

更新:在dano的幫助下,我解決了這個問題。如何在多處理進程完成後退出python腳本?

我沒有調用生產者join(),它使我的腳本掛起。 只需要添加一個行達諾說:

... 
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
producer.daemon = True 
producer.start() 
... 

舊腳本:

import multiprocessing 
import Queue 

QUEUE_SIZE = 2000 


def produce(file_queue, row_queue,): 

    while not file_queue.empty(): 
     src_file = file_queue.get() 
     zip_reader = gzip.open(src_file, 'rb') 

     try: 
      csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER) 

      for row in csv_reader: 
       new_row = process_sdp_row(row) 
       if new_row: 
        row_queue.put(new_row) 
     finally: 
      zip_reader.close() 


def consume(row_queue): 
    '''processes all rows, once queue is empty, break the infinit loop''' 
    while True: 
     try: 
      # takes a row from queue and process it 
      pass 
     except multiprocessing.TimeoutError as toe: 
      print "timeout, all rows have been processed, quit." 
      break 
     except Queue.Empty: 
      print "all rows have been processed, quit." 
      break 
     except Exception as e: 
      print "critical error" 
      print e 
      break 


def main(args): 

    file_queue = multiprocessing.Queue() 
    row_queue = multiprocessing.Queue(QUEUE_SIZE) 

    file_queue.put(file1) 
    file_queue.put(file2) 
    file_queue.put(file3) 

    # starts 3 producers 
    for i in xrange(4): 
     producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
     producer.start() 

    # starts 1 consumer 
    consumer = multiprocessing.Process(target=consume,args=(row_queue,)) 
    consumer.start() 

    # blocks main thread until consumer process finished 
    consumer.join() 

    # prints statistics results after consumer is done 

    sys.exit(0) 


if __name__ == "__main__": 
    main(sys.argv[1:]) 

目的

我使用python 2.7multiprocessing生成3個生產者讀取3個文件的同時,然後將文件行放入row_queue中,並生成1位消費者對所有行執行更多處理。打印統計結果在消費者完成後在主線程中產生,所以我使用join()方法。最後調用sys.exit(0)來退出腳本。

問題: 無法退出腳本。

我試圖用print "the end"代替sys.exit(0),在控制檯上顯示「結束」。難道我做錯了什麼?爲什麼腳本不會退出,以及如何讓它退出?由於

+1

這裏沒有足夠的信息可以肯定地說,但最可能的問題是一個或多個'Producer'進程仍在運行。嘗試通過在調用'producer之前添加'producer.daemon = True'來使它們守護進程。start()',看腳本是否會退出。 – dano 2015-04-02 16:35:11

+0

或者,爲每個生產者調用'join()'以及...假設它們通常會終止。 – twalberg 2015-04-02 16:40:13

+0

@twalberg如果它們最終結束,腳本將不需要調用'join()'而退出。當解釋器退出時,Python隱式地在非守護進程子進程上調用join()。 – dano 2015-04-02 16:41:34

回答

0

producers沒有multiprocessing.Process.daemon屬性格式設置:

守護

進程的守護進程的標誌,一個布爾值。這必須在start()被調用之前設置。

初始值從創建過程繼承。

當進程退出時,它將嘗試終止其所有守護進程的子進程。

請注意,不允許守護進程創建子進程。否則,守護進程會在其父進程退出時終止其子進程。另外,這些不是Unix守護進程或服務,它們是正常的進程,如果非守護進程退出,它們將被終止(而不是加入)。

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.daemon

只需添加producer.daemon = True

... 
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
producer.daemon = True 
producer.start() 
... 

這應該有可能使當consumer被加入整個程序結束。

順便說一下,你應該也可能是join的製作者。

+0

如果生產者沒有自己退出,調用'join'將會使程序掛起。他需要修復那些阻止他們退出以便調用'join'不會導致掛起的情況,在這種情況下,不需要首先使他們成爲守護進程。 – dano 2015-04-02 16:57:07

+0

@dano是的,你是對的。但是由於我們不知道完整的「產品」功能是怎樣的,我只是試圖保持簡單。我傾向於爲我的'Processes'使用毒丸,然後加入它們,但我也將它們設置爲'daemon = True',所以當父母在發出毒丸之前意外退出時,它們不會掛起。但隨意downvote .. – 2015-04-02 17:01:08

+0

@dano @Jan Spurny嗨,大家好,我通過我的'produce()'方法,你能檢查出來嗎? – haifzhan 2015-04-02 17:08:40