2012-02-28 77 views
3

我需要爲我的數據庫的每個元素運行一個函數。不能grok python多處理

當我嘗試以下方法:

from multiprocessing import Pool 
from pymongo import Connection 

def foo(): 
... 


connection1 = Connection('127.0.0.1', 27017) 
db1 = connection1.data 

my_pool = Pool(6) 
my_pool.map(foo, db1.index.find()) 

,我發現了以下錯誤:

Job 1, 'python myscript.py ' terminated by signal SIGKILL (Forced quit)

這是,我認爲,造成db1.index.find()吃所有可用的RAM,而試圖返回數以百萬計的數據庫元素...

我該如何修改我的代碼才能正常工作?

一些日誌會在這裏:

dmesg | tail -500 | grep memory 
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child 
[177891.001379] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250 
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child 
[177891.025399] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250 

實際以下功能:

def create_barrel(item): 
    connection = Connection('127.0.0.1', 27017) 
    db = connection.data 
    print db.index.count() 
    barrel = [] 
    fls = [] 
    if 'name' in item.keys(): 
     barrel.append(WhitespaceTokenizer().tokenize(item['name'])) 
     name = item['name'] 
    elif 'name.utf-8' in item.keys(): 
     barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8'])) 
     name = item['name.utf-8'] 
    else: 
     print item.keys() 
    if 'files' in item.keys(): 
     for file in item['files']: 
      if 'path' in file.keys(): 
       barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path']))) 
       fls.append(("\\".join(file['path']),file['length'])) 
      elif 'path.utf-8' in file.keys(): 
       barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8']))) 
       fls.append(("\\".join(file['path.utf-8']),file['length'])) 
      else: 
       print file 
       barrel.append(WhitespaceTokenizer().tokenize(file)) 
    if len(fls) < 1: 
     fls.append((name,item['length'])) 
    barrel = sum(barrel,[]) 
    for s in barrel: 
     vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500 
    b0 = [] 
    for s in barrel: 
     b0.append(re.split("[" + string.punctuation + "]", s)) 
    b1 = filter(lambda x: x not in string.punctuation, sum(b0,[])) 
    flag = True 
    while flag: 
     bb = [] 
     flag = False 
     for bt in b1: 
      if bt[0] in string.punctuation: 
       bb.append(bt[1:]) 
       flag = True 
      elif bt[-1] in string.punctuation: 
       bb.append(bt[:-1]) 
       flag = True 
      else: 
       bb.append(bt) 
     b1 = bb 
    b2 = b1 + barrel + vs 
    b3 = list(set(b2)) 
    b4 = map(lambda x: x.lower(), b3) 
    b_final = {} 
    b_final['_id'] = item['_id'] 
    b_final['tags'] = b4 
    b_final['name'] = name 
    b_final['files'] = fls 
    print db.barrels.insert(b_final) 

我已經注意到有趣的事情。然後我按Ctrl + C停止的過程,我發現了以下內容:

python index2barrel.py 
Traceback (most recent call last): 
    File "index2barrel.py", line 83, in <module> 
    my_pool.map(create_barrel, db1.index.find, 6) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async 
    iterable = list(iterable) 
TypeError: 'instancemethod' object is not iterable 

我的意思是,爲什麼多是試圖事端轉換到列表中?這不是問題的根源嗎?

從堆棧跟蹤:

brk(0x231ccf000)      = 0x231ccf000 
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1 
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43 
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663 
... [manymany times] 
brk(0x2320d5000)      = 0x2320d5000 
.... manymany times 

將上述樣品進入並進入在strace的輸出,由於某種原因strace的-o日誌文件蟒myscript.py 不會暫停。它只吃所有可用的RAM並寫入日誌文件。

UPDATE。使用imap而不是map解決了我的問題。

+0

這是Linux,是嗎?系統日誌中是否有任何內容顯示OOM殺手終止了您的流程? – cha0site 2012-02-28 08:40:20

+0

@ cha0site是的,這是Ubuntu的。我將檢查日誌,但是從htop中我看到內存(所有8 GB!)在幾秒鐘內就被吃掉了,所以我認爲問題在於映射(...)試圖將整個數據庫放入列表中。不要如何避免這種情況。 – Moonwalker 2012-02-28 08:50:02

+0

我想我們可能需要看看'foo()'做了什麼。 'find()'返回一個'Cursor',它不應該將所有記錄存儲在內存中...... – cha0site 2012-02-28 11:32:24

回答

2

由於find()操作返回光標的地圖功能,因爲你說當你做這個運行沒有問題 for item in db1.index.find(): create_barrel(item) 它看起來像功能是OK。

你可以嘗試限制遊標中返回結果的數量,看看這是否有幫助?我覺得語法是:

db1.index.find().limit(100) 

如果你可以試試這個,看看它是否有助於它可能有助於獲取問題的原因。編輯1:我認爲你是通過使用map函數去錯誤的方式 - 我認爲你應該在mongo python驅動程序中使用map_reduce - 這樣map函數將由mongod進程執行。

+0

嗯,那有效。我需要循環迭代循環和skip(n * 100)或者其他東西,但是man,這個解決方案感覺有點骯髒:|我的意思是,我已經有了map函數,爲什麼要使用循環... – Moonwalker 2012-03-01 05:20:11

+1

我猜您可以嘗試添加 '__len__' 返回 'def __len __(self):' '#__len__ is deprecated(replace with size())並將被刪除。 # #這個棄用的原因有點複雜: #list(...)調用_PyObject_LengthHint來猜測返回列表需要多少空間 #。該方法又調用__len__。因此,如果我們保留__len__,那麼在Cursor實例上調用列表(...)將至少需要 #兩次往返數據庫 - 這使得它的大約兩倍於光標處的[x for x],爲 #,這對用戶來說並不明顯。' – Gregor 2012-03-01 11:01:28

+0

我試着添加'__len__',你和Lycha建議,但它失敗了'Traceback(最近調用最後):文件「index2barrel.py」,第86行,在 my_pool .map(create_barrel,cursor,chunksize = 10)文件「/usr/lib/python2.7/multiprocessing/pool.py」,第227行,在map中返回self.map_async(func,iterable,chunksize).get()File 「/usr/lib/python2.7/multiprocessing/pool.py」,第286行,在map_async中if len(iterable)== 0:TypeError:'Cursor'類型的對象沒有len()'我試過自己添加len函數,但沒有運氣。 – Moonwalker 2012-03-02 08:26:35

1

map()函數給出了給定函數塊中的項。默認情況下此CHUNKSIZE計算如下(link to source):

chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 

這可能導致過大的塊大小,你的情況,並讓內存耗盡的過程。嘗試手動設置塊的大小是這樣的:

my_pool.map(foo, db1.index.find(), 100) 

編輯:你也應該考慮重新使用的數據庫連接,使用後關閉它們。現在您爲每個項目創建新的數據庫連接,並且您不要爲它們調用close()

編輯2:同時檢查是否while循環進入無限循環(將解釋症狀)。

EDIT3:根據你添加的回溯函數,map函數試圖將光標轉換成列表,導致所有的項目一次被提取。發生這種情況是因爲它想要查找集合中有多少物品。這是pool.pymap()部分代碼:

if not hasattr(iterable, '__len__'): 
    iterable = list(iterable) 

你可以試試這個,避免轉換到列表:

cursor = db1.index.find() 
cursor.__len__ = cursor.count() 
my_pool.map(foo, cursor) 
+0

我試過這個(甚至與塊大小= 6)並得到了同樣的問題:python開始額外的進程(在我的例子中達到指定的數量,即6),這些進程正在吃ram,直到沒有ram離開,所有的事情都停下來。 當我運行這個函數時,for循環內存消耗幾乎不明顯。 – Moonwalker 2012-02-28 14:43:33

+0

@Moonwalker我又添加了一個關於db連接使用的註釋。 – Lycha 2012-02-28 14:50:44

+0

我甚至從函數中刪除數據庫連接,看看它是否有效,但沒有運氣:( 看來,該函數什麼都不做,沒有一個元素被寫入數據庫... – Moonwalker 2012-02-28 15:26:01