2010-06-17 215 views
9

我在下面包含的python中執行嵌套循環。這是搜索現有金融時間序列並尋找符合某些特徵的時間序列的基本方式。在這種情況下,有兩個單獨的同樣大小的數組表示「關閉」(即資產價格)和「交易量」(即在此期間交換的資產量)。對於每個時間段,我想期待在未來所有的間隔長度1和INTERVAL_LENGTH之間並看看是否有這些間隔有符合我搜索(在這種情況下的特性密切值的比例比1.0001少較大超過1.5並且總體積大於100)。如何加速python嵌套循環?

我的理解是,使用NumPy時加速的主要原因之一是解釋器不需要在每次評估某些東西時鍵入檢查操作數,只要您將操作數組作爲整個(例如numpy_array * 2),但顯然下面的代碼沒有利用這一點。有沒有辦法用某種可能導致加速的窗口函數替代內部循環,或者使用numpy/scipy以其他方式在本地python中大幅加速?

或者,有沒有更好的方法來做到這一點(例如,在C++中編寫此循環並使用編織會更快)?

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.array(xrange(ARRAY_LENGTH)) 
volume = np.array(xrange(ARRAY_LENGTH)) 
close, volume = close.astype('float64'), volume.astype('float64') 

results = [] 
for i in xrange(len(close) - INTERVAL_LENGTH): 
    for j in xrange(i+1, i+INTERVAL_LENGTH): 
     ret = close[j]/close[i] 
     vol = sum(volume[i+1:j+1]) 
     if ret > 1.0001 and ret < 1.5 and vol > 100: 
      results.append([i, j, ret, vol]) 
print results 
+1

您的計算看起來很簡單,您爲什麼不使用Cython? – Tarantula 2010-06-17 21:19:15

回答

6

更新:(幾乎)完全量化低於版本 「new_function2」 ......

我會添加註釋來解釋的東西位。

它給出了〜50倍的加速比,如果你對輸出是numpy數組而不是列表的輸出沒問題,那麼可能會有更大的加速比。原因是:

In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 1.15 s per loop 

您可以使用np.cumsum()調用來替換內循環...見下面我的「new_function」函數。這給出了一個相當的加速...

In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 15.7 s per loop 

VS

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 53.1 s per loop 

應該可以向量化整個事情,避免循環完全,但...給我一分鐘,我會看到我能做什麼......

import numpy as np 

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.arange(ARRAY_LENGTH, dtype=np.float) 
volume = np.arange(ARRAY_LENGTH, dtype=np.float) 

def old_function(close, volume, INTERVAL_LENGTH): 
    results = [] 
    for i in xrange(len(close) - INTERVAL_LENGTH): 
     for j in xrange(i+1, i+INTERVAL_LENGTH): 
      ret = close[j]/close[i] 
      vol = sum(volume[i+1:j+1]) 
      if (ret > 1.0001) and (ret < 1.5) and (vol > 100): 
       results.append((i, j, ret, vol)) 
    return results 


def new_function(close, volume, INTERVAL_LENGTH): 
    results = [] 
    for i in xrange(close.size - INTERVAL_LENGTH): 
     vol = volume[i+1:i+INTERVAL_LENGTH].cumsum() 
     ret = close[i+1:i+INTERVAL_LENGTH]/close[i] 

     filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100) 
     j = np.arange(i+1, i+INTERVAL_LENGTH)[filter] 

     tmp_results = zip(j.size * [i], j, ret[filter], vol[filter]) 
     results.extend(tmp_results) 
    return results 

def new_function2(close, volume, INTERVAL_LENGTH): 
    vol, ret = [], [] 
    I, J = [], [] 
    for k in xrange(1, INTERVAL_LENGTH): 
     start = k 
     end = volume.size - INTERVAL_LENGTH + k 
     vol.append(volume[start:end]) 
     ret.append(close[start:end]) 
     J.append(np.arange(start, end)) 
     I.append(np.arange(volume.size - INTERVAL_LENGTH)) 

    vol = np.vstack(vol) 
    ret = np.vstack(ret) 
    J = np.vstack(J) 
    I = np.vstack(I) 

    vol = vol.cumsum(axis=0) 
    ret = ret/close[:-INTERVAL_LENGTH] 

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100) 

    vol = vol[filter] 
    ret = ret[filter] 
    I = I[filter] 
    J = J[filter] 

    output = zip(I.flat,J.flat,ret.flat,vol.flat) 
    return output 

results = old_function(close, volume, INTERVAL_LENGTH) 
results2 = new_function(close, volume, INTERVAL_LENGTH) 
results3 = new_function(close, volume, INTERVAL_LENGTH) 

# Using sets to compare, as the output 
# is in a different order than the original function 
print set(results) == set(results2) 
print set(results) == set(results3) 
3

一個加速。將除去sum部,如在該實施方式中它通過INTERVAL_LENGTH總結長度爲2的列表。相反,只需將volume[j+1]添加到循環最後一次迭代的vol的前一個結果中。因此,每次只需添加兩個整數,而不是每次總結整個列表並對其進行切片。此外,而不是通過做sum(volume[i+1:j+1])開始,只是做vol = volume[i+1] + volume[j+1],因爲你知道初始情況下,這裏將永遠是隻有兩個指標。

另一個加速將使用.extend而不是.append,因爲python執行有extend運行速度顯着加快。

您也可以拆分最後的if語句,以便僅在需要時才執行某些計算。例如,你知道if vol <= 100,你不需要計算ret

這並不是完全回答你的問題,但我認爲特別是總和問題,你應該看到這些改變顯着加速。

編輯 - 你也不需要len,因爲你已經明確知道列表的長度(除非這只是例子)。將其定義爲數字而不是len(something)總是更快。

編輯 - 執行(這是未經測試):

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.array(xrange(ARRAY_LENGTH)) 
volume = np.array(xrange(ARRAY_LENGTH)) 
close, volume = close.astype('float64'), volume.astype('float64') 

results = [] 
ex = results.extend 
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH): 
    vol = volume[i+1] 
    for j in xrange(i+1, i+INTERVAL_LENGTH): 
     vol += volume[j+1] 
     if vol > 100: 
      ret = close[j]/close[i] 
      if 1.0001 < ret < 1.5: 
       ex([i, j, ret, vol]) 
print results 
+0

另一個加速將會定義'extend_results = results.extend'一次(在循環之前),然後在循環內部使用'extend_results([i,j,ret,vol])'來避免查找,但總是測量(用' timeit'模塊) – ChristopheD 2010-06-17 21:34:45

+0

有趣!查找時間有多大,一般?通常是一種有用的加速,還是因爲這個特定循環的幅度更大? – nearlymonolith 2010-06-17 21:43:01

+2

@Anthony Morelli:局部變量查找很多因爲'編譯器'優化了函數體,因此局部變量不需要字典查找,所以全局或內置變量查找耗時更少(另請參見:http://www.python.org/doc/essays/list2str.html) 。但是一般來說基準測試總是必要的,因爲(不成功的)範圍查找時間受到需要考慮的項目大小的影響等等。但是,對於這樣大的循環來說,考慮這種技術是可行的(未測試) ,但它至少應該在一定程度上更快)。 – ChristopheD 2010-06-17 22:09:58

1

你爲什麼不嘗試生成的結果作爲一個單獨的列表,像(不是追加或延長快得多):

results = [ t for t in ((i, j, close[j]/close[i], sum(volume[i+1:j+1])) 
         for i in xrange(len(close)-INT_LEN) 
          for j in xrange(i+1, i+INT_LEN) 
         ) 
      if t[3] > 100 and 1.0001 < t[2] < 1.5 
      ]