我試圖實現類似df.apply的函數,但跨數據塊的並行化。我寫了下面的測試代碼,看看我是多麼可能獲得(與數據複製等):對熊貓數據幀的多處理。基於輸入大小的混淆行爲
from multiprocessing import Pool
from functools import partial
import pandas as pd
import numpy as np
import time
def df_apply(df, f):
return df.apply(f, axis=1)
def apply_in_parallel(df, f, n=5):
pool = Pool(n)
df_chunks = np.array_split(df, n)
apply_f = partial(df_apply, f=f)
result_list = pool.map(apply_f, df_chunks)
return pd.concat(result_list, axis=0)
def f(x):
return x+1
if __name__ == '__main__':
N = 10^8
df = pd.DataFrame({"a": np.zeros(N), "b": np.zeros(N)})
print "parallel"
t0 = time.time()
r = apply_in_parallel(df, f, n=5)
print time.time() - t0
print "single"
t0 = time.time()
r = df.apply(f, axis=1)
print time.time() - t0
怪異的行爲: 爲N = 10^7它的工作原理 爲N = 10^8它給了我一個錯誤
Traceback (most recent call last):
File "parallel_apply.py", line 27, in <module>
r = apply_in_parallel(df, f, n=5)
File "parallel_apply.py", line 14, in apply_in_parallel
result_list = pool.map(apply_f, df_chunks)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 528, in get
raise self._value
AttributeError: 'numpy.ndarray' object has no attribute 'apply'
有沒有人知道這裏發生了什麼? 我也很感謝這種並行化的反饋。我期待的功能需要花費更多的時間,而不是每個行和數百萬行的總和或總和。
謝謝!