2017-04-05 134 views
2

我有一個文件列表,我將其傳遞給for循環並執行一大堆函數。什麼是最簡單的方法來並行呢?不知道我可以在任何地方找到這個確切的東西,我認爲我目前的實施是不正確的,因爲我只看到一個文件正在運行。從我所做的一些閱讀中,我認爲這應該是一個完全平行的例子。使用for循環的多處理池

舊代碼是這樣的:

import pandas as pd 
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv'] 
for file in filenames: 
    file1 = pd.read_csv(file) 
    print('running ' + str(file)) 
    a = function1(file1) 
    b = function2(a) 
    c = function3(b) 
    for d in range(1,6): 
      e = function4(c, d) 
    c.to_csv('output.csv') 

(錯誤地)並行代碼

import pandas as pd 
from multiprocessing import Pool 
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv'] 
def multip(filenames): 
    file1 = pd.read_csv(file) 
    print('running ' + str(file)) 
    a = function1(file1) 
    b = function2(a) 
    c = function3(b) 
    for d in range(1,6): 
      e = function4(c, d) 
    c.to_csv('output.csv') 

if __name__ == '__main__' 
    pool = Pool(processes=4) 
    runstuff = pool.map(multip(filenames)) 

(認爲)我想要做的是有一個文件是計算每個核心(可能是每個進程?)。我也沒有

multiprocessing.cpu_count() 

並得到8(我有一個四,所以它可能考慮到線程)。由於我總共有大約10個文件,如果我可以在每個進程中放置一個文件來加快速度,那將非常棒!我希望剩下的2個文件能夠在第一輪的流程完成後找到一個流程。

編輯: 爲了進一步的清楚起見,函數(即函數1,函數2等)也在它們各自的文件內被饋入其他函數(即函數1a,函數1b)。我使用import語句調用函數1。

我得到以下錯誤:

OSError: Expected file path name or file-like object, got <class 'list'> type 

顯然不喜歡被通過的名單,但我不想做文件名[0] if語句,因爲只運行一個文件

回答

1
import multiprocessing 
names = ['file1.csv', 'file2.csv'] 
def multip(name): 
    [do stuff here] 

if __name__ == '__main__': 
    #use one less process to be a little more stable 
    p = multiprocessing.Pool(processes = multiprocessing.cpu_count()-1) 
    #timing it... 
    start = time.time() 
    for file in names: 
    p.apply_async(multip, [file]) 

    p.close() 
    p.join() 
    print("Complete") 
    end = time.time() 
    print('total time (s)= ' + str(end-start)) 

編輯︰交換出if__name __ =='____main___'爲這一個。這運行所有文件:

if __name__ == '__main__': 

    p = Pool(processes = len(names)) 
    start = time.time() 
    async_result = p.map_async(multip, names) 
    p.close() 
    p.join() 
    print("Complete") 
    end = time.time() 
    print('total time (s)= ' + str(end-start))