2013-02-03 54 views
22

我正在使用multiprocessing.Pool()來並行化一些繁重的計算。使用大數據進行多處理

目標函數返回大量數據(一個巨大的列表)。我用完了RAM。

沒有multiprocessing,我只是將目標函數轉換爲一個生成器,然後將生成的元素一個接一個地計算出來。

我知道多處理不支持生成器 - 它等待整個輸出並立即返回,對吧?沒有屈服。有沒有辦法讓Pool工作人員在數據可用時立即生成數據,而無需在RAM中構建整個結果數組?

簡單的例子:

def target_fnc(arg): 
    result = [] 
    for i in xrange(1000000): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     for element in result: 
      yield element 

這是Python 2.7版。

回答

15

這聽起來像一個理想的使用情況下的隊列:http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

只需從彙集的工人養活你的結果到隊列中,在主嚥下。

請注意,您仍然可能會遇到內存壓力問題,除非您排隊的速度幾乎與工作人員填充隊列一樣快。您可以限制隊列大小(適合隊列的最大對象數),在這種情況下,池隊員將在queue.put語句上阻塞,直到隊列中的空間可用。這會給內存使用帶來上限。 但是如果您正在這樣做,現在可能是時候重新考慮您是否需要集中和/或是否可以使用較少的工人。

+1

隊列將傳遞pickle數據。所以數據 - > pickle-> unpickle->新的數據副本。這會減慢程序速度並使用更多的額外RAM。應該考慮使用共享內存。 – Wang

3

如果您的任務可以以塊的形式返回數據......它們可以分解爲更小的任務,每個任務都返回一個塊?顯然,這並不總是可能的。如果不是,則必須使用其他機制(如Loren Abrams所建議的Queue)。但是,當它,這可能是一個更好的解決方案,其他原因,以及解決這個問題。

以你爲例,這當然是可行的。例如:

def target_fnc(arg, low, high): 
    result = [] 
    for i in xrange(low, high): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    pool_args = [] 
    for low in in range(0, 1000000, 10000): 
     pool_args.extend(args + [low, low+10000] for args in some_args) 
    for result in pool.imap_unordered(target_fnc, pool_args): 
     for element in result: 
      yield element 

(你當然可以取代嵌套理解的循環,或zipflatten,如果你喜歡。)

所以,如果some_args[1, 2, 3],你會得到300任務-[[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …],其中每一個只返回10000個元素而不是1000000.

3

從您的描述來看,這聽起來像您對處理數據沒有太大興趣,因爲它們避免傳遞百萬個元素list回來。

有一個更簡單的方法來做到這一點:只需將數據放入一個文件。例如:

def target_fnc(arg): 
    fd, path = tempfile.mkstemp(text=True) 
    with os.fdopen(fd) as f: 
     for i in xrange(1000000): 
      f.write('dvsdbdfbngd\n') 
    return path 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     with open(result) as f: 
      for element in f: 
       yield element 

顯然,如果您的結果可能包含換行符,或者不是字符串,等等,你需要使用一個csv文件,numpy,等等,而不是一個簡單的文本文件,但這個想法是一樣的。這就是說,即使這更簡單,一次處理數據塊通常會帶來好處,所以如果分解任務或使用Queue(如其他兩個答案所示)可能會更好,如果缺點(分別需要一種方法來打破任務,或者必須能夠像生產數據一樣快地使用數據)不是交易破壞者。