2016-11-22 43 views
5

我目前正在嘗試讀取大型文件(8000萬行),我需要爲每個條目進行計算密集型矩陣乘法。計算完成後,我想將結果插入到數據庫中。由於這個過程需要時間密集的方式,我想將文件分割到多個核心上以加速進程。Python:使用多個內核的進程文件

經過研究,我發現這個有前途的嘗試,它將文件拆分成n部分。

def file_block(fp, number_of_blocks, block): 
    ''' 
    A generator that splits a file into blocks and iterates 
    over the lines of one of the blocks. 

    ''' 

    assert 0 <= block and block < number_of_blocks 
    assert 0 < number_of_blocks 

    fp.seek(0,2) 
    file_size = fp.tell() 

    ini = file_size * block/number_of_blocks 
    end = file_size * (1 + block)/number_of_blocks 

    if ini <= 0: 
     fp.seek(0) 
    else: 
     fp.seek(ini-1) 
     fp.readline() 

    while fp.tell() < end: 
     yield fp.readline() 

迭代,你可以這樣調用該函數:

if __name__ == '__main__': 
    fp = open(filename) 
    number_of_chunks = 4 
    for chunk_number in range(number_of_chunks): 
     print chunk_number, 100 * '=' 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

雖然這工作,我遇到問題,並行這種利用多:

fp = open(filename) 
number_of_chunks = 4 
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] 

p = Pool(cpu_count() - 1) 
p.map(processChunk,li) 

與錯誤之中,發電機不能醃製。

雖然我明白這個錯誤,但首先遍歷整個文件以將所有行放入列表中的代價太昂貴了。

此外,我想用每迭代芯線的塊,因爲它是更有效的(如果使用的典型地圖的方法,而不是1由1)至多行插入到數據庫中,在一次

由於您的幫助。

+3

您可以對大文件進行初始傳遞,以記錄搜索座標以及從該位置讀取的行數。然後你可以用這兩個數字來調用你的多處理器,並在每個進程中保留髮生器。 – kezzos

+0

是否有可能先將文件分成四個文件? – cwallenpoole

+0

將文件打開和'file_block'代碼移入每個線程,而不是在線程啓動之前嘗試初始化它。將文件打開4次而不是隻打開一次,只要它是隻讀的即可。 –

回答

3

不是先創建生成器並將它們傳遞到每個線程,而是將其留給線程代碼。

def processChunk(params): 
    filename, chunk_number, number_of_chunks = params 
    with open(filename, 'r') as fp: 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)] 
p.map(processChunk, li)