2014-10-31 21 views
1

我試圖實現類似df.apply的函數,但跨數據塊的並行化。我寫了下面的測試代碼,看看我是多麼可能獲得(與數據複製等):對熊貓數據幀的多處理。基於輸入大小的混淆行爲

from multiprocessing import Pool 
from functools import partial 
import pandas as pd 
import numpy as np 
import time 

def df_apply(df, f): 
    return df.apply(f, axis=1) 

def apply_in_parallel(df, f, n=5): 
    pool = Pool(n) 
    df_chunks = np.array_split(df, n) 
    apply_f = partial(df_apply, f=f) 
    result_list = pool.map(apply_f, df_chunks) 
    return pd.concat(result_list, axis=0) 

def f(x): 
    return x+1 

if __name__ == '__main__': 
    N = 10^8 
    df = pd.DataFrame({"a": np.zeros(N), "b": np.zeros(N)}) 

    print "parallel" 
    t0 = time.time() 
    r = apply_in_parallel(df, f, n=5) 
    print time.time() - t0 

    print "single" 
    t0 = time.time() 
    r = df.apply(f, axis=1) 
    print time.time() - t0 

怪異的行爲: 爲N = 10^7它的工作原理 爲N = 10^8它給了我一個錯誤

Traceback (most recent call last): 
    File "parallel_apply.py", line 27, in <module> 
    r = apply_in_parallel(df, f, n=5) 
    File "parallel_apply.py", line 14, in apply_in_parallel 
    result_list = pool.map(apply_f, df_chunks) 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 227, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 528, in get 
    raise self._value 
AttributeError: 'numpy.ndarray' object has no attribute 'apply' 

有沒有人知道這裏發生了什麼? 我也很感謝這種並行化的反饋。我期待的功能需要花費更多的時間,而不是每個行和數百萬行的總和或總和。

謝謝!

回答

1

array_split接受任何陣列狀參數(包括pandas.DataFrame對象),而僅返回保證它返回一個numpy.ndarray(其DataFrames是)。當然,ndarrays沒有apply方法,這正是您所看到的錯誤。我真的很驚訝,這在任何情況下都有效。您可能需要將數據幀拆分爲子框架或應用在ndarrays上運行的功能。

1

N = 10^8導致2N = 10^7結果13,因爲操作者^是XOR(未功率)。所以一個2行長的df不能被分成5個塊。改用此:N = 10**4N = 10**5。有了這些值,你會看到時間上的差異。注意大於N = 10**6的值(在此值時平行時間約爲30秒,單次約爲167秒)。並在apply_in_parallel()末尾(return之前)使用pool.close()自動關閉池中的所有工人。