2011-09-29 99 views
21

我使用的反應擴展到數據整理成的100ms的緩衝:反應式擴展程序是否支持滾動緩衝區?

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100)) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

這工作得很好。不過,我想要的行爲與Buffer操作提供的行爲稍有不同。實質上,如果收到另一個數據項,我想重置定時器。只有當整個100毫秒內沒有收到數據時,我纔想要處理它。這開放了從來沒有處理數據的可能性,所以我應該也能夠指定最大數量。我會想象一下:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000) 

我已經看了一下,在Rx中找不到這樣的東西?任何人都可以確認/否認這一點?

+0

我確定我在Rx上的其中一個教程視頻中看到了這種行爲,但恐怕我不記得在哪裏或哪個位置。 :( – Chris

+0

啊,油門(http://msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx)是我在想什麼,但我不認爲這是做什麼你需要的是自己的,不知道是否有某種方法可以將它結合起來做想要的事情... – Chris

回答

12

我寫了一個擴展名,可以完成大部分的功能 - BufferWithInactivity

這就是:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

+1謝謝。你是僅僅爲了這個問題或者爲你自己寫這個東西的嗎?它是否被用在生產代碼中? –

+0

@KentBoogaart - 我在幾個月前寫過,但它還沒有在生產代碼中,它仍然是一個WIP。 – Enigmativity

+0

+1 SerialDisposable的好用法... –

0

我想這可以緩衝方法之上實現,如下圖所示:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max) 
     { 
      return Observable.CreateWithDisposable<IList<T>>(cl => 
      { 
       var acc = new List<T>(); 
       return obs.Buffer(span) 
         .Subscribe(next => 
         { 
          if (next.Count == 0) //no activity in time span 
          { 
           cl.OnNext(acc); 
           acc.Clear(); 
          } 
          else 
          { 
           acc.AddRange(next); 
           if (acc.Count >= max) //max items collected 
           { 
            cl.OnNext(acc); 
            acc.Clear(); 
           } 
          } 
         }, err => cl.OnError(err),() => { cl.OnNext(acc); cl.OnCompleted(); }); 
      }); 
     } 

注:我沒有測試過,但我希望它給你的想法。

12

這可以通過組合Observable的內置WindowThrottle方法來實現。首先,讓我們來解決,我們忽略了最大計數條件簡單的問題:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay) 
{ 
    var closes = stream.Throttle(delay); 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

強大Window method確實繁重。現在很容易看到如何添加最大數量:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null) 
{ 
    var closes = stream.Throttle(delay); 
    if (max != null) 
    { 
     var overflows = stream.Where((x,index) => index+1>=max); 
     closes = closes.Merge(overflows); 
    } 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

我會在博客上寫一篇文章解釋這一點。 https://gist.github.com/2244036

的文檔窗口的方法:

+0

有了上面的BufferUntilInactive場景 - 如果用戶比生產者慢,你可能會看到一個場景,下一組窗口項目將被緩衝,並且不會被推送給用戶,除非產生一個項目... –

+0

我已經附上一個樣本http://snipt.org/Bhao0。在visual studio中(1)打開輸出窗口(2)檢查掛起按鈕(3)點擊按鈕(4)等待它在控制檯上打印「立即點擊」。 (5)按下按鈕三次,您將看到錯過了三次點擊。 –

3

,其中Rx擴展2.0,你可以回答一個新的緩衝這兩個要求重載接受超時和大小:

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100), 1) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

有關文檔,請參閱https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

+0

但是,這不會有一個滑動窗口,這是所要求的那種「去抖」行爲? – Cocowalla

+0

@Cocowalla我重讀了原來的問題,我提供的代碼確實滿足了所有要求。我已經在生產代碼中使用了它,並取得了巨大成功 –

+0

對不起,我的意思是具體的反跳行爲:「我想重置計時器,如果收到另一個數據項」 - 我沒有看到你的代碼這樣做? AFAICS,你的代碼將每隔100毫秒將緩衝區推送給訂閱者(只要它不是空的) – Cocowalla