2017-03-27 26 views
1

我使用的是Rx.Net,並且我有Observable可以發出時間序列點(double,timestamp)。每次新點到達時,我都想從最後30秒計算平均值。我想我需要一些不基於計數但是時間戳的重疊窗口/緩衝區。根據時間戳實現與ReactiveX的移動平均值

我發現this主題與SlidingWindow實現,但我無法弄清楚如何適應我的問題。

編輯:

感謝this我才知道,我可以使用掃描操作和緩衝區我的觀點,所以這個basicly解決了這個問題。但也許有更好的方法來做到這一點?

回答

0

BufferWindow向前看,你想要的東西回頭看。 Scan是最好的起點:

public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts) 
{ 
    return BackBuffer(source, ts, Scheduler.Default); 
} 
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler) 
{ 
    return source 
     .Timestamp() 
     .Scan(new List<Timestamped<T>>(), (list, element) => list 
      .Where(ti => scheduler.Now - ti.Timestamp <= ts) 
      .Concat(Enumerable.Repeat(element, 1)) 
      .ToList() 
     ) 
     .Select(list => list.Select(t => t.Value).ToList()); 
} 

一旦你有了BackBuffer,或類似的東西,那麼剩下變得容易:

source 
    .BackBuffer(TimeSpan.FromMilliseconds(70)) 
    .Select(list => list.Average()) 
    .Subscribe(average => Console.WriteLine(average));