浮動數學雖然我喜歡這個Martijn's answer,像喬治,我想知道,這不會是通過使用運行總和,而不是在幾乎相同的數字一遍遍應用sum()
更快。
同樣在斜坡階段爲None
值作爲默認的想法很有趣。實際上,可能會有很多不同的場景可以設想移動平均線。讓我們平均的計算過程分爲三個階段:
- 斜升:開始反覆在當前迭代計數<窗口大小
- 穩步推進:我們有完全相同的窗口大小數量的元素可用來計算正常
average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
- 斜坡下降:在輸入數據的最後,我們可以返回另一個
window_size - 1
「平均」的數字。
下面是接受
- 任意iterables(發電機是精細)作爲用於數據輸入
- 任意窗口尺寸> = 1個
- 參數期間接通/斷開生產值的函數對於這些階段斜坡向上/向下
- 回調函數的相位控制值是如何產生的。這可以用來不斷地提供一個默認的(如
None
),或提供部分平均值
下面的代碼:
from collections import deque
def moving_averages(data, size, rampUp=True, rampDown=True):
"""Slide a window of <size> elements over <data> to calc an average
First and last <size-1> iterations when window is not yet completely
filled with data, or the window empties due to exhausted <data>, the
average is computed with just the available data (but still divided
by <size>).
Set rampUp/rampDown to False in order to not provide any values during
those start and end <size-1> iterations.
Set rampUp/rampDown to functions to provide arbitrary partial average
numbers during those phases. The callback will get the currently
available input data in a deque. Do not modify that data.
"""
d = deque()
running_sum = 0.0
data = iter(data)
# rampUp
for count in range(1, size):
try:
val = next(data)
except StopIteration:
break
running_sum += val
d.append(val)
#print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d))
if rampUp:
if callable(rampUp):
yield rampUp(d)
else:
yield running_sum/size
# steady
exhausted_early = True
for val in data:
exhausted_early = False
running_sum += val
#print("st: running sum:" + str(running_sum) + " deque: " + str(d))
yield running_sum/size
d.append(val)
running_sum -= d.popleft()
# rampDown
if rampDown:
if exhausted_early:
running_sum -= d.popleft()
for (count) in range(min(len(d), size-1), 0, -1):
#print("dn: running sum:" + str(running_sum) + " deque: " + str(d))
if callable(rampDown):
yield rampDown(d)
else:
yield running_sum/size
running_sum -= d.popleft()
這似乎是有點比馬亭的版本快 - 這是更爲雖然優雅。下面是測試代碼:
print("")
print("Timeit")
print("-" * 80)
from itertools import islice
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
# Martijn's version:
def moving_averages_SO(values, size):
for selection in window(values, size):
yield sum(selection)/size
import timeit
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)]
for problem_size in problems:
print("{:12s}".format(str(problem_size)), end="")
so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages_SO")
print("{:12.3f} ".format(min(so)), end="")
my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages")
print("{:12.3f} ".format(min(my)), end="")
print("")
和輸出:
Timeit
--------------------------------------------------------------------------------
10 7.242 7.656
100 5.816 5.500
1000 5.787 5.244
10000 5.782 5.180
100000 5.746 5.137
1000000 5.745 5.198
10000000 5.764 5.186
原來的問題現在可以調用這個函數來解決:
print(list(moving_averages(range(1,11), 5,
rampUp=lambda _: None,
rampDown=False)))
輸出:
[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
爲什麼你在列表中有字符串而不是數字? – 2013-02-14 21:08:50