2017-07-20 70 views
1

我有問題並行分配我的功能。Python多處理 - 棘手的用例,包括傳遞參數

問題描述:我有2個座標對列表,dfCdfO。對於dfC中的每個obs,我正在統計有多少dfO的半徑爲r我目前有一個工作功能,但我想看看我是否可以並行處理。

問題是這樣的:dfC可以拆分和單獨處理...但dfO需要100%的每個工人。我的方法是,讓我先把它並行工作 - 然後我會擔心如何向工人分發dfO的完整副本。除非有人能幫我解決這兩個問題?

首先,這裏設置的一切行動代碼:

import pandas as pd 
import numpy as np 
import multiprocessing as mp 
from multiprocessing import Pool, process 
import traceback 
from scipy.spatial import cKDTree 

# create 2 dataframes with random "coordinates" 
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy')) 
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk')) 

這裏是什麼dfC樣子,dfO將類似於

+----+----+ 
| x | y | 
+----+----+ 
| 35 | 5 | 
+----+----+ 
| 96 | 18 | 
+----+----+ 
| 23 | 25 | 
+----+----+ 
| 20 | 7 | 
+----+----+ 
| 74 | 54 | 
+----+----+ 

下一個例子,這裏是工作的功能等魅力。我不是單獨傳遞所有參數,而是實際上是這樣做的 - 準備一個主函數來並行地調用這些參數(並且我無法找到一種多處理方法來完成這項工作)。

# this function works on dfC, and adds a row which counts the number 
# of objects in dfO which are within radius r 
def worker_job(args): 
    try: 
     dfC, dfO, newcol, r = args 

     mxC=dfC.as_matrix() 
     mxO = dfO.as_matrix() 

     # magic tree stuff 
     C_Tree = cKDTree(mxC) 
     O_Tree = cKDTree(mxO) 

     listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0) 

     counts=[] 
     for i in listoflists: 
      counts.append(len(i)) 

     s = pd.Series(counts) 

     dfC[newcol] = s.values 

    except: 
     raise 
     traceback.print_exc() 
    else: 
     return dfC 

如果我創造我的論點是這樣的: args=[dfC,dfO,"new_column_name",3]

它完美,當我通過自身運行它: worker_job(args)

+----+----+-----------------+ 
| x | y | new_column_name | 
+----+----+-----------------+ 
| 35 | 5 |  4  | 
+----+----+-----------------+ 
| 96 | 18 |  1  | 
+----+----+-----------------+ 
| 23 | 25 |  0  | 
+----+----+-----------------+ 
| 20 | 7 |  1  | 
+----+----+-----------------+ 
| 74 | 54 |  2  | 
+----+----+-----------------+ 

現在,我嘗試建立函數,將控制並行工作者並行運行這個東西。這是我的最大努力:

# this function should control the multiprocessing 
def Run_Parallel(Function, Num_Proc, args): 
    try: 
     pool = Pool(Num_Proc) 
     parts = pool.map(Function,args) 
     pool.close() 
     pool.join() 

     results_df = pd.concat(parts) 

    except: 
     pool.close() 
     pool.terminate() 
     traceback.print_exc() 
    else: 
     return results_df 

它不會工作。 Run_Parallel(worker_job,2,args)會拋出一個關於ValueError: not enough values to unpack (expected 4, got 2)的錯誤。當它通過包裝器時,必須發生一些參數列表。

我正在尋找這個錯誤的指導,特別是,誰知道如何解決更大的問題 - 這是我需要我的池包含100%的dfO和只是dfC子集的效率。

+1

'Pool.map'預計的'iterable'。所以你必須把你的'args'列表放在另一個列表中,然後傳遞給'map'函數。您可能會注意到它與直接調用'worker_job'函數沒有區別。所以你不得不重構你的程序。 – Himal

回答

1

答案是將參數作爲列表列表傳遞。這也解決了另一個分裂數據框的問題(我認爲pool默認情況下會處理這個問題,但它不會)。

正確的函數應該是這樣的:

# this function should control the multiprocessing 
def Run_Parallel(Function, Num_Proc, args): 
    dfC, dfO, newcol, r = args 

    # to make lists of lists 
    argslist=[] 
    dfOlist=[] 
    dfClist=[] 
    resultlist=[] 

    # split dfC into parts 
    Cparts=np.array_split(dfC, Num_Proc) 

    # build the lists 
    for i in range(Num_Proc): 
     argslist.append([Cparts[i],dfO,newcol,r]) 


    try: 
     pool = Pool(Num_Proc) 
     parts = pool.map(Function,argslist) 
     pool.close() 
     pool.join() 

     results_df = pd.concat(parts) 

    except: 
     pool.close() 
     pool.terminate() 
     traceback.print_exc() 
    else: 
     return results_df