2010-08-27 135 views
6

這裏就是我試圖完成 -蟒蛇 - >多模塊

  1. 我有一百萬個文件,我需要解析&追加解析的內容到一個文件中。
  2. 由於單個過程需要很長時間,因此此選項不可用。
  3. 在Python中不使用線程,因爲它實質上涉及運行單個進程(由於GIL)。
  4. 因此使用多處理模塊。即產生4個子過程,以利用所有原始核心功率:)

到目前爲止好,現在我需要一個共享對象,所有的子進程都可以訪問。我正在使用多處理模塊中的隊列。此外,所有子流程都需要將其輸出寫入單個文件。一個潛在的地方使用鎖我猜。有了這個設置,當我運行時,我沒有得到任何錯誤(所以父進程看起來很好),它只是停滯不前。當我按ctrl-C時,我看到一個回溯(每個子進程一個)。也沒有輸出寫入輸出文件。這裏的代碼(注意,沒有多進程運行良好) -

import os 
import glob 
from multiprocessing import Process, Queue, Pool 

data_file = open('out.txt', 'w+') 

def worker(task_queue): 
    for file in iter(task_queue.get, 'STOP'): 
     data = mine_imdb_page(os.path.join(DATA_DIR, file)) 
     if data: 
      data_file.write(repr(data)+'\n') 
    return 

def main(): 
    task_queue = Queue() 
    for file in glob.glob('*.csv'): 
     task_queue.put(file) 
    task_queue.put('STOP') # so that worker processes know when to stop 

    # this is the block of code that needs correction. 
    if multi_process: 
     # One way to spawn 4 processes 
     # pool = Pool(processes=4) #Start worker processes 
     # res = pool.apply_async(worker, [task_queue, data_file]) 

     # But I chose to do it like this for now. 
     for i in range(4): 
      proc = Process(target=worker, args=[task_queue]) 
      proc.start() 
    else: # single process mode is working fine! 
     worker(task_queue) 
    data_file.close() 
    return 

我做錯了什麼?我也嘗試在生成時將打開的file_object傳遞給每個進程。但沒有效果。例如 - Process(target=worker, args=[task_queue, data_file])。但是這並沒有改變任何東西。我覺得由於某種原因,子進程無法寫入文件。要麼file_object的實例沒有得到複製(在產卵時)或其他一些怪癖......任何人有一個想法?

另外:也有任何方法來保持一個持久的mysql_connection打開&傳遞給子進程嗎?所以我打開我的父進程中的mysql連接&打開的連接應該可以訪問我所有的子進程。基本上這相當於python中的shared_memory。這裏的任何想法?

+0

如果你不寫入文件,但做一個打印,它的工作呢? (在Linux上我會做python script.py> out.dat以防止屏幕氾濫)。 – extraneon 2010-08-27 17:37:49

+1

我認爲proc.start是非阻塞的,因此您可能應該等待某個地方讓流程有機會在做datafile.close() – extraneon 2010-08-27 17:42:30

+0

data_file.close()的最後完成一些工作。它應該在這裏有效嗎?另外打印工作正常。當我使用print時,我在屏幕上看到輸出...但是我想使用文件。幫幫我! 也有沒有辦法保持一個持久的mysql_connection打開並傳遞給子進程? – 2010-08-27 17:47:00

回答

4

儘管與Eric的討論富有成效,但後來我發現了一個更好的方法。在多處理模塊中有一個名爲「Pool」的方法,它非常適合我的需求。

它優化自己的核心數量我的系統。即只有當許多過程產生時纔會產生。的核心。當然這是可定製的。所以這是代碼。稍後可以幫助他人 -

from multiprocessing import Pool 

def main(): 
    po = Pool() 
    for file in glob.glob('*.csv'): 
     filepath = os.path.join(DATA_DIR, file) 
     po.apply_async(mine_page, (filepath,), callback=save_data) 
    po.close() 
    po.join() 
    file_ptr.close() 

def mine_page(filepath): 
    #do whatever it is that you want to do in a separate process. 
    return data 

def save_data(data): 
    #data is a object. Store it in a file, mysql or... 
    return 

仍然會經歷這個龐大的模塊。不確定save_data()是由父進程執行還是由生成的子進程使用此函數。如果是保存的孩子,在某些情況下可能會導致併發問題。如果任何人有使用這個模塊的經驗,你在這裏感謝更多的知識...

3

爲多的文檔表明進程之間的共享狀態的幾種方法:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

我敢肯定,每個進程都有一個新的解釋,然後目標(功能)和args被裝進去。在這種情況下,腳本中的全局命名空間將被綁定到你的工作函數,所以data_file將會在那裏。但是,我不確定文件描述符在複製時發生了什麼。你有沒有嘗試傳遞文件對象作爲參數之一?

另一種方法是通過另一個隊列來保存工人的結果。工作人員put的結果和主碼get的結果並將其寫入文件。

+0

是啊!我可以做到。我可以有另一個Queue,它們就像進程寫入的out_queue一樣。由於父進程有權訪問它可以繼續讀取該隊列並寫入文件。這可以工作! 此外,我嘗試傳遞文件對象作爲參數之一。它似乎沒有工作。線程不寫入文件。 也埃裏克,任何想法如何傳遞一個持久的mysql連接到子進程? – 2010-08-27 18:13:49

+0

@Srikar,希望有所幫助。至於mysql連接,我不確定那個。我會說你最好每個過程都有獨立的連接。即使你可以建立連接,我也不確定它是如何「線程安全」的。如果你真的必須分享一個,那麼你可能不得不做一些奇怪的事情。然後再次,您可以在Queue中代理連接的查詢/響應機制。然後主進程(或單獨的mysql處理程序進程)從隊列中獲取查詢,運行它們,並將結果返回......或類似的東西。 – 2010-08-27 18:39:34