2013-02-14 118 views
8

比方說,我有一個列表:如何計算Python 3中的移動平均值?

y = ['1', '2', '3', '4','5','6','7','8','9','10'] 

我希望創建一個計算移動正天平均值的函數。 所以如果n是5,我想我的代碼計算出第一個1-5,添加它並找到平均值,這將是3.0,然後繼續到2-6,計算平均值,這將是4.0,然後3-7,4-8,5-9,6-10。

我不想計算第n-1天,所以從第n天開始計算前幾天。

def moving_average(x:'list of prices', n): 
    for num in range(len(x)+1): 
     print(x[num-n:num]) 

這似乎是打印出來什麼,我想:

[] 
[] 
[] 
[] 
[] 

['1', '2', '3', '4', '5'] 

['2', '3', '4', '5', '6'] 

['3', '4', '5', '6', '7'] 

['4', '5', '6', '7', '8'] 

['5', '6', '7', '8', '9'] 

['6', '7', '8', '9', '10'] 

不過,我不知道如何計算這些名單內的號碼。有任何想法嗎?

+5

爲什麼你在列表中有字符串而不是數字? – 2013-02-14 21:08:50

回答

18

有一個老版本的Python文檔的一個偉大的滑動窗口發生器itertools examples

from itertools import islice 

def window(seq, n=2): 
    "Returns a sliding window (of width n) over data from the iterable" 
    " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...     " 
    it = iter(seq) 
    result = tuple(islice(it, n)) 
    if len(result) == n: 
     yield result  
    for elem in it: 
     result = result[1:] + (elem,) 
     yield result 

使用您的移動平均線是簡單的:

from __future__ import division # For Python 2 

def moving_averages(values, size): 
    for selection in window(values, size): 
     yield sum(selection)/size 

運行此對您的輸入(將字符串映射爲整數)給出:

>>> y= ['1', '2', '3', '4','5','6','7','8','9','10'] 
>>> for avg in moving_averages(map(int, y), 5): 
...  print(avg) 
... 
3.0 
4.0 
5.0 
6.0 
7.0 
8.0 

要重新轉None第一n - 1迭代 '不完整' 集,只是擴大了moving_averages功能一點:

def moving_averages(values, size): 
    for _ in range(size - 1): 
     yield None 
    for selection in window(values, size): 
     yield sum(selection)/size 
+1

+1我從來沒有見過這個功能。 – placeybordeaux 2013-02-14 21:09:24

+1

我希望結果是[none,none,none,none,3.0,4.0,5.0,6.0,7.0,8.0]儘管 – Kara 2013-02-14 21:13:47

+0

雖然我非常感謝您的優雅解決方案,但我卻將其與追蹤運行總和而不是重新計算總和倍數。請參閱[我的答案](http://stackoverflow.com/a/14942753/923794)。如果將函數簡化爲僅回答原始問題並且不允許其他參數,則這可能會更快。 – cfi 2013-02-19 08:19:48

1

使用summap函數。

print(sum(map(int, x[num-n:num]))) 

map功能在Python 3基本上是一個懶惰這個:

[int(i) for i in x[num-n:num]] 

我敢肯定,你可以猜到sum功能做什麼。

1

是避免重複計算中間和值的方法..

list=range(0,12) 
def runs(v): 
global runningsum 
runningsum+=v 
return(runningsum) 
runningsum=0 
runsumlist=[ runs(v) for v in list ] 
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)] 

打印結果

[2,3,4,5,6,7,8,9] 

make that runs(int(v)).. then .. rep r(runsumlist [k] - runsumlist [k-5])/ 5) 如果你螞蟻攜帶數字串。


Alt鍵沒有全球:

list = [float[x] for x in range(0,12)] 
nave = 5 
movingave = sum(list[:nave]/nave) 
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave) 
print movingave 

一定要做到,即使你輸入值是整數

[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0] 
+0

事實上,運行總和算法更快。我發佈了一個可以證明你的觀點。這裏只需要一個'global'變量。 – cfi 2013-02-18 18:16:35

+0

正確的你,我試圖太難以明確的循環.. – agentp 2013-02-19 18:37:44

4

浮動數學雖然我喜歡這個Martijn's answer,像喬治,我想知道,這不會是通過使用運行總和,而不是在幾乎相同的數字一遍遍應用sum()更快。

同樣在斜坡階段爲None值作爲默認的想法很有趣。實際上,可能會有很多不同的場景可以設想移動平均線。讓我們平均的計算過程分爲三個階段:

  1. 斜升:開始反覆在當前迭代計數<窗口大小
  2. 穩步推進:我們有完全相同的窗口大小數量的元素可用來計算正常average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
  3. 斜坡下降:在輸入數據的最後,我們可以返回另一個window_size - 1「平均」的數字。

下面是接受

  • 任意iterables(發電機是精細)作爲用於數據輸入
  • 任意窗口尺寸> = 1個
  • 參數期間接通/斷開生產值的函數對於這些階段斜坡向上/向下
  • 回調函數的相位控制值是如何產生的。這可以用來不斷地提供一個默認的(如None),或提供部分平均值

下面的代碼:

from collections import deque 

def moving_averages(data, size, rampUp=True, rampDown=True): 
    """Slide a window of <size> elements over <data> to calc an average 

    First and last <size-1> iterations when window is not yet completely 
    filled with data, or the window empties due to exhausted <data>, the 
    average is computed with just the available data (but still divided 
    by <size>). 
    Set rampUp/rampDown to False in order to not provide any values during 
    those start and end <size-1> iterations. 
    Set rampUp/rampDown to functions to provide arbitrary partial average 
    numbers during those phases. The callback will get the currently 
    available input data in a deque. Do not modify that data. 
    """ 
    d = deque() 
    running_sum = 0.0 

    data = iter(data) 
    # rampUp 
    for count in range(1, size): 
     try: 
      val = next(data) 
     except StopIteration: 
      break 
     running_sum += val 
     d.append(val) 
     #print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d)) 
     if rampUp: 
      if callable(rampUp): 
       yield rampUp(d) 
      else: 
       yield running_sum/size 

    # steady 
    exhausted_early = True 
    for val in data: 
     exhausted_early = False 
     running_sum += val 
     #print("st: running sum:" + str(running_sum) + " deque: " + str(d)) 
     yield running_sum/size 
     d.append(val) 
     running_sum -= d.popleft() 

    # rampDown 
    if rampDown: 
     if exhausted_early: 
      running_sum -= d.popleft() 
     for (count) in range(min(len(d), size-1), 0, -1): 
      #print("dn: running sum:" + str(running_sum) + " deque: " + str(d)) 
      if callable(rampDown): 
       yield rampDown(d) 
      else: 
       yield running_sum/size 
      running_sum -= d.popleft() 

這似乎是有點比馬亭的版本快 - 這是更爲雖然優雅。下面是測試代碼:

print("") 
print("Timeit") 
print("-" * 80) 

from itertools import islice 
def window(seq, n=2): 
    "Returns a sliding window (of width n) over data from the iterable" 
    " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...     " 
    it = iter(seq) 
    result = tuple(islice(it, n)) 
    if len(result) == n: 
     yield result  
    for elem in it: 
     result = result[1:] + (elem,) 
     yield result 

# Martijn's version: 
def moving_averages_SO(values, size): 
    for selection in window(values, size): 
     yield sum(selection)/size 


import timeit 
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)] 
for problem_size in problems: 
    print("{:12s}".format(str(problem_size)), end="") 

    so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size, 
         setup="from __main__ import moving_averages_SO") 
    print("{:12.3f} ".format(min(so)), end="") 

    my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size, 
         setup="from __main__ import moving_averages") 
    print("{:12.3f} ".format(min(my)), end="") 

    print("") 

和輸出:

Timeit 
-------------------------------------------------------------------------------- 
10     7.242  7.656 
100    5.816  5.500 
1000    5.787  5.244 
10000    5.782  5.180 
100000    5.746  5.137 
1000000   5.745  5.198 
10000000   5.764  5.186 

原來的問題現在可以調用這個函數來解決:

print(list(moving_averages(range(1,11), 5, 
          rampUp=lambda _: None, 
          rampDown=False))) 

輸出:

[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 
0

還有另一種解決方案extendin g itertools配方pairwise()。您可以擴展這nwise(),它給你的滑動窗口(如果可迭代是發電機的工作原理):

def nwise(iterable, n): 
    ts = it.tee(iterable, n) 
    for c, t in enumerate(ts): 
     next(it.islice(t, c, c), None) 
    return zip(*ts) 

def moving_averages_nw(iterable, n): 
    yield from (sum(x)/n for x in nwise(iterable, n)) 

>>> list(moving_averages_nw(range(1, 11), 5)) 
[3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 

雖然短iterable個比較高的設置成本這一成本的影響降低了較長的數據組。這使用sum(),但代碼相當優雅:

Timeit    MP   cfi   ***** 
-------------------------------------------------------------------------------- 
10     4.658  4.959  7.351 
100    5.144  4.070  4.234 
1000    5.312  4.020  3.977 
10000    5.317  4.031  3.966 
100000    5.508  4.115  4.087 
1000000   5.526  4.263  4.202 
10000000   5.632  4.326  4.242