1

我想在Python中使用multiprocessing模塊並行化df.corr()。我正在計算一列,並計算一個進程中所有列的剩餘值和另一個進程中剩餘其他列的第二列的相關值。我以這種方式繼續填充相關矩陣的上層特徵,將所有過程的結果行疊加起來。Python平行關聯比單進程關聯慢

我拿了形狀(678461, 210)的樣本數據,試了我的並行方法和df.corr(),分別得到了運行時間爲214.40s42.64s。所以,我的並行化方法需要更多時間。

有沒有辦法改善這一點?

import multiprocessing as mp 
import pandas as pd 
import numpy as np 
from time import * 

def _correlation(args): 

    i, mat, mask = args 
    ac = mat[i] 

    arr = [] 

    for j in range(len(mat)): 
     if i > j: 
      continue 

     bc = mat[j] 
     valid = mask[i] & mask[j] 
     if valid.sum() < 1: 
      c = NA  
     elif i == j: 
      c = 1. 
     elif not valid.all(): 
      c = np.corrcoef(ac[valid], bc[valid])[0, 1] 
     else: 
      c = np.corrcoef(ac, bc)[0, 1] 

     arr.append((j, c)) 

    return arr 

def correlation_multi(df): 

    numeric_df = df._get_numeric_data() 
    cols = numeric_df.columns 
    mat = numeric_df.values 

    mat = pd.core.common._ensure_float64(mat).T 
    K = len(cols) 
    correl = np.empty((K, K), dtype=float) 
    mask = np.isfinite(mat) 

    pool = mp.Pool(processes=4) 

    ret_list = pool.map(_correlation, [(i, mat, mask) for i in range(len(mat))]) 

    for i, arr in enumerate(ret_list): 
     for l in arr: 
      j = l[0] 
      c = l[1] 

      correl[i, j] = c 
      correl[j, i] = c 

    return pd.DataFrame(correl, index = cols, columns = cols) 

if __name__ == '__main__': 
    noise = pd.DataFrame(np.random.randint(0,100,size=(100000, 50))) 
    noise2 = pd.DataFrame(np.random.randint(100,200,size=(100000, 50))) 
    df = pd.concat([noise, noise2], axis=1) 

    #Single process correlation  
    start = time() 
    s = df.corr() 
    print('Time taken: ',time()-start) 

    #Multi process correlation 
    start = time() 
    s1 = correlation_multi(df) 
    print('Time taken: ',time()-start) 

回答

0

_correlation結果必須從工作進程到進程中運行通過進程間通信Pool移動。

這意味着返回數據被醃製,發送到另一個進程,取消選擇並添加到結果列表中。 這需要時間並且本質上是一個連續的過程。

map以它們發送的順序IIRC處理退貨。因此,如果一次迭代需要相對較長的時間,其他結果可能會停滯不前。您可以嘗試使用imap_unordered,一旦它們到達,就會產生結果。