2017-08-05 143 views
1

如何通過numpy.apply_along_axis()將NumPy數組元素的函數應用並行化以利用多核?這似乎是一件很自然的事情,在所有對所使用函數的調用都是獨立的情況下。numpy.apply_along_axis()的簡單並行化?

在我的特殊情況下(如果這很重要),應用軸是軸0:np.apply_along_axis(func, axis=0, arr=param_grid)np是NumPy)。

我在有一個快速瀏覽一下Numba,但我似乎無法得到這個並行,與像一個循環:

@numba.jit(parallel=True) 
result = np.empty(shape=params.shape[1:]) 
for index in np.ndindex(*result.shape)): # All the indices of params[0,...] 
    result[index] = func(params[(slice(None),) + index]) # Applying func along axis 0 

還有顯然是編譯選項在與NumPy進行並行通過OpenMP,但似乎無法通過MacPorts訪問。

人們也可能會想到可能會在幾塊中切割陣列,並使用線程(以避免複製數據)並將每個塊上的函數並行應用。這比我所尋找的要複雜得多(如果Global Interpreter Lock沒有足夠的發佈版本,這可能不起作用)。

能夠以簡單的方式使用多個內核對於簡單的可並行化任務(比如將一個函數應用於數組的所有元素(這基本上就是這裏所需要的) func()取一維數組參數)。

+0

'apply_along_axis'是純粹的Python代碼,除了將感興趣的軸轉置到最後,並且對其餘的部分執行'ndindex(arr.shape [: - 1])'以外,您所做的只是顯示。替代方法已經在像https://stackoverflow.com/questions/45067268/numpy-vectorized-2d-array-operation-error – hpaulj

+0

這樣的帖子中討論過了,因爲第二個問題可以重新設計爲2d(您的感興趣軸加上其餘部分),基本問題是1d列表理解。遍歷行。另一個SO問題:https://stackoverflow.com/questions/44239498/how-to-apply-a-generic-function-over-numpy-rows – hpaulj

+0

我希望這些StackOverflow問題包含一個解決方案,使用多個核心,我可以使用!現在,我不確定Python列表理解如何成功比'np.apply_along_axis()'更快,但是至少可以通過探索'np.apply_along_axis()'的簡單替代方法來加快單核版本的速度...... – EOL

回答

1

好吧,我的工作了:一個想法是使用標準multiprocessing模塊並在短短數塊原始數組拆分(以便限制與工人溝通的開銷)。這可以如下被相對容易地完成:

import multiprocessing 

import numpy as np 

def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs): 
    """ 
    Like numpy.apply_along_axis(), but takes advantage of multiple 
    cores. 
    """   
    # Effective axis where apply_along_axis() will be applied by each 
    # worker (any non-zero axis number would work, so as to allow the use 
    # of `np.array_split()`, which is only done on axis 0): 
    effective_axis = 1 if axis == 0 else axis 
    if effective_axis != axis: 
     arr = arr.swapaxes(axis, effective_axis) 

    # Chunks for the mapping (only a few chunks): 
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs) 
       for sub_arr in np.array_split(arr, multiprocessing.cpu_count())] 

    pool = multiprocessing.Pool() 
    individual_results = pool.map(unpacking_apply_along_axis, chunks) 
    # Freeing the workers: 
    pool.close() 
    pool.join() 

    return np.concatenate(individual_results) 

其中Pool.map()正在應用的功能unpacking_apply_along_axis()是獨立的,因爲它應該(使得子過程可以導入的話),並且是一個簡單的薄包裝處理該事實Pool.map()只需要一個參數:

def unpacking_apply_along_axis((func1d, axis, arr, args, kwargs)): 
    """ 
    Like numpy.apply_along_axis(), but and with arguments in a tuple 
    instead. 

    This function is useful with multiprocessing.Pool().map(): (1) 
    map() only handles functions that take a single argument, and (2) 
    this function can generally be imported from a module, as required 
    by map(). 
    """ 
    return np.apply_along_axis(func1d, axis, arr, *args, **kwargs) 

在我的具體情況,這導致2個內核和超線程2倍的速度提升。接近4倍的因素會更好,但加速已經很不錯了,只需幾行代碼,對於具有更多內核的機器(這很常見)應該會更好。也許有避免數據拷貝和使用共享內存的方法(可能通過multiprocessing module本身)?