2015-11-02 60 views
2

我想在Rx.NET中實現一個信號過濾器,它以一組初始係數開始。隨着時間的推移,必須從觀測數據值的快照重新計算濾波器係數。使用Rx.NET的信號過濾器

這是一個小的原型,它顯示了它應該如何工作。爲了簡單起見,我選擇濾波器長度和用於重新計算濾波器係數的歷史數據值的數量(在本例中爲3)。

該示例使用bufferedAt10中的副作用重新計算係數。這不是我想要的。

在實際應用中,數據以不規則的時間步進行,係數應該在特定時間每天更新一次或更新一次。我可以很容易地使緩衝區更長,但是我怎樣讓系統運行並以清晰的功能方式從觀察者處改變濾波器係數?

 // create a hot obvervable to produce data 
     const int bufLen = 3; 
     var rng = new Random(); 
     var period = TimeSpan.FromSeconds(0.5); 
     var observable = Observable.Interval(period) 
     .Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()}) 
     .Do(e => Console.WriteLine("original : {0}", e)) 
     .Publish(); 
     observable.Connect(); 

     Console.WriteLine("Press any key to subscribe"); 
     Console.ReadKey(); 

     // buffer of length bufLen used for filter calculation (every tick) and filter 
     // coefficient update (at a lower frequency) 
     var buffered = observable.Buffer(bufLen, 1); 

     // apply the signal filter with coefficients in `coeff` 
     var coeff = new List<Double>() {1.0, 1.0, 1.0}; // these will be updated on the way from new data 
     var filtered = buffered.Select(e => 
     { 
      var f = 0.0; 
      for (var i = 0; i < bufLen; i++) 
      { 
       f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff` 
      } 
      return new {Time = DateTime.Now, FilteredPrice = f}; 
     }); 

     var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2])); 

     // recalculate the filter coefficients say every 10 seconds 
     var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds/10) * 10); 

     var s2 = bufferedAt10.Subscribe(e => 
     { 
      Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time); 
      for (var i = 0; i < bufLen; i++) 
      { 
       // a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients 
       coeff[i] = coeff[i] + e[bufLen - 1 - i].Price; 
      } 
      Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]); 
     }); 

感謝您的任何好建議。

回答

1

以下是未經測試,但我認爲它應該涵蓋你所需要的。其背後的想法是,你分流的係數更新一個係數,然後把它們帶回WithLatestFrom。我用SampleScan來執行期間「調整」。而您的自定義時間戳可以通過使用TimeStamp運算符來完成。您也可以考慮將Publish向下移動,否則您將有兩個數據流生成緩衝區,但這取決於您。

const int bufLen = 3; 
var rng = new Random(); 
var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable.Interval(period) 
    .Select(=> rng.NextDouble()) 
    .Publish(); 
observable.Connect(); 

Console.WriteLine("Press any key to subscribe"); 
Console.ReadKey(); 

var buffered = observable.Buffer(bufLen, 1); 

var seed = new [] {1.0, 1.0, 1.0}; 

var coefficients = buffered 
        //Samples for a new value every 10 seconds 
        .Sample(TimeSpan.FromSeconds(10)) 
        //Updates the seed value and emits it after every update 
        .Scan(seed, 
         //Use good old fashion Linq 
         (coeff, delta) => coeff.Zip(delta.Reverse(), 
             (c, d) => c + d.Price) 
             .ToArray() 
         ); 

//Emits a new value everytime buffer emits, and combines it with the latest 
//values from the coefficients Observable 
//Kick off coefficients with the seed otherwise you need to wait 10 seconds 
//for the first value. 
buffer.WithLatestFrom(coefficients.StartWith(seed), (e, coeff) => { 
    return e.Zip(coeff, (x, c) => x.Price * c).Sum(); 
}) 
.TimeStamp() 
.Subscribe(e => Console.WriteLine("filtered : {0}", e); 
+0

非常感謝,我明白了。我想'WithLatestFrom'應該是'CombineLatest'。 – Daniel

+0

@Daniel'CombineLatest'將觸發源* Observable * *。 'WithLatestFrom'只有在第一個源發出時纔會觸發。它只能在預發佈版本2.3.0-beta中提供 – paulpdaniels