2017-04-11 85 views
6

假設我有一個NumPy整數數組。向量化切片的最小值和最大值可能嗎?

arr = np.random.randint(0, 1000, 1000) 

我有兩個陣列lowerupper,它們分別代表對arr切片下限和上限。這些間隔是重疊和可變長度的,但是lowersuppers都保證不減少。

lowers = np.array([0, 5, 132, 358, 566, 822]) 
uppers = np.array([45, 93, 189, 533, 800, 923]) 

我想找到分鐘,並通過lowersuppers定義的arr每個切片的最大,而這些存儲在另一個陣列。

out_arr = np.empty((lowers.size, 2)) 

什麼是最有效的方式做到這一點?我很擔心,還沒有一個量化的方法,因爲我看不到我如何避開索引在一個循環..


我目前的做法只是簡單

for i in range(lowers.size): 
    arr_v = arr[lowers[i]:uppers[i]] 
    out_arr[i,0] = np.amin(arr_v) 
    out_arr[i,1] = np.amax(arr_v) 

其葉子我想要的結果像

In [304]: out_arr 
Out[304]: 

array([[ 26., 908.], 
     [ 18., 993.], 
     [ 0., 968.], 
     [ 3., 999.], 
     [ 1., 998.], 
     [ 0., 994.]]) 

但這對我的實際數據太慢了。

+0

多少片,esp與'arr'的大小相比?看起來他們長度不一樣?他們可以重疊?圍繞這種迭代的唯一方法是使用'accumulate'。例如'cumsum'可以在某些情況下工作,例如切片和手段。 – hpaulj

+0

@hpaulj是的,它們是可變長度的,重疊的,對於大約10^7個元素的數組,大約有10^5個切片。這些都是我從db讀取的所有輸入,所以我認爲在此之前沒有任何「更好的第一步」的空間。 –

+0

'np.minimum.reduceat'可能適用。這有點挑剔,因爲人們不得不混合上限和下限。最終,人們只會用較小的數值來將問題減少到同一類型。但可能仍然值得。 –

回答

3

好,這裏是如何至少向下大小使用np.minimum.reduceat原題:

lu = np.r_[lowers, uppers] 
so = np.argsort(lu) 
iso = np.empty_like(so) 
iso[so] = np.arange(len(so)) 
cut = len(lowers) 
lmin = np.minimum.reduceat(arr, lu[so]) 
for i in range(cut): 
    print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]])) 

# 33 33 
# 7 7 
# 5 5 
# 0 0 
# 3 3 
# 7 7 

被擺脫什麼,這並不實現主循環的,但至少數據是從降低1000元素數組到12個元素之一。

更新:

對於小重疊@Eric漢森自己的解決方案是很難被擊敗。我仍然想指出,如果存在重大的重疊,那麼將兩種方法結合起來甚至是值得的。我沒有numba,所以下面僅僅是一個twopass版本,結合我preprossing與Eric的純numpy解決方案,它也充當的onepass形式的基準:

import numpy as np 
from timeit import timeit 

def twopass(lowers, uppers, arr): 
    lu = np.r_[lowers, uppers] 
    so = np.argsort(lu) 
    iso = np.empty_like(so) 
    iso[so] = np.arange(len(so)) 
    cut = len(lowers) 
    lmin = np.minimum.reduceat(arr, lu[so]) 
    return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2] 

def onepass(lowers, uppers, arr): 
    mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
    mixture[::2] = lowers; mixture[1::2] = uppers 
    return np.minimum.reduceat(arr, mixture)[::2] 

arr = np.random.randint(0, 1000, 1000) 
lowers = np.array([0, 5, 132, 358, 566, 822]) 
uppers = np.array([45, 93, 189, 533, 800, 923]) 

print('small') 
for f in twopass, onepass: 
    print('{:18s} {:9.6f} ms'.format(f.__name__, 
            timeit(lambda: f(lowers, uppers, arr), 
              number=10)*100)) 

arr = np.random.randint(0, 1000, 10**6) 
lowers = np.random.randint(0, 8*10**5, 10**4) 
uppers = np.random.randint(2*10**5, 10**6, 10**4) 
swap = lowers > uppers 
lowers[swap], uppers[swap] = uppers[swap], lowers[swap] 


print('large') 
for f in twopass, onepass: 
    print('{:18s} {:10.4f} ms'.format(f.__name__, 
            timeit(lambda: f(lowers, uppers, arr), 
              number=10)*100)) 

採樣運行:

small 
twopass    0.030880 ms 
onepass    0.005723 ms 
large 
twopass    74.4962 ms 
onepass    3153.1575 ms 
+0

謝謝保羅,這太好了。我使用'reduceat'建議調整了一個解決方案,並且自己添加了一個答案,但是我要測試一下,如果它的可比較性,我會在您給出這個想法時將您的答案標記爲已接受。 –

-1

由於循環內的執行速度較慢,子數組被複制到一個數組,然後執行操作。您可避免一行代碼整個循環

out_array = np.array([(np.amin(arr[lowers[i]:uppers[i]]),np.amax(arr[lowers[i]:uppers[i]])) for i in range(lowers.size)]) 
1

我想出了基於的reduceat保羅裝甲的建議,我原來嘗試的改進版本是

mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
mixture[::2] = lowers; mixture[1::2] = uppers 

np.column_stack((np.minimum.reduceat(arr, mixture)[::2], 
       np.maximum.reduceat(arr, mixture)[::2])) 

在樣本大小堪比我實際數據,這在我的機器上運行4.22毫秒,而我的原始解決方案需要73毫秒。

甚至更​​快,雖然是隻使用Numba我原來的解決方案

from numba import jit 

@jit 
def get_res(): 
    out_arr = np.empty((lowers.size, 2)) 
    for i in range(lowers.size): 
     arr_v = arr[lowers[i]:uppers[i]] 
     out_arr[i,0] = np.amin(arr_v) 
     out_arr[i,1] = np.amax(arr_v) 
    return out_arr 

它運行在我的機器上100微秒。

+0

如果有人在這裏看到任何可怕的錯誤,請讓我知道! –

+0

整潔。我總是忘記一個人可以用'reduceat'倒退。僅僅是出於興趣,因爲這個numba解決方案非常快。如果你不介意我的問話。你的投入有多大?你在別的地方提過的10^7/10^5嗎?段的重疊程度有多大(比如,段長度與數據長度之比)? –

+0

從一個間隔到下一個間隔的重疊大約是每個間隔總長度的80%......我回來說,在進一步的基準測試中,你的NumPy方法比我的間隔重疊要快。標記爲接受:) –

相關問題