2010-07-09 62 views
25

我想要有效地調節事件流,以便在接收到第一個事件時調用我的代理,但如果接收到後續事件,則不會持續1秒。超時後(1秒),如果接收到後續事件,我希望我的代理被調用。如何用RX節制事件流?

有沒有簡單的方法來使用Reactive Extensions來做到這一點?

示例代碼:

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var builder = new StringBuilder(); 

    generator 
     .Sample(TimeSpan.FromSeconds(1)) 
     .Finally(() => Console.WriteLine(builder.ToString())) 
     .Subscribe(feed => 
        builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}", 
                feed.Value, 
                feed.Timestamp.ToString("mm:ss.fff"), 
                DateTime.Now.ToString("mm:ss.fff")))); 

    Console.ReadKey(); 
} 

電流輸出:

Running... 
Observed 064, generated at 41:43.602, observed at 41:43.602 
Observed 100, generated at 41:44.165, observed at 41:44.602 

但我想觀察(時間戳顯然會發生變化)

Running... 
Observed 001, generated at 41:43.602, observed at 41:43.602 
.... 
Observed 100, generated at 41:44.165, observed at 41:44.602 
+14

這只是一個很酷的拉姆達聲明'X => X <= 100') – Oliver 2010-07-09 08:52:27

回答

11

這裏是我從RX論壇一些幫助了:

的想法是發出一系列的「入場券」的原始序列火的。這些「票據」因超時而延遲,不包括第一個,它會立即被預先寫入票據序列。當一個事件進入並且有一張票正在等待時,該事件立即發生,否則它會一直等到票證然後再發射。當它發生火災時,下一張票被髮出,等等......

要合併票和原始事件,我們需要一個組合器。不幸的是,「標準」.CombineLatest在這裏不能使用,因爲它會觸發先前使用的票據和事件。所以我不得不創建自己的combinator,它基本上是一個過濾的.CombineLatest,只有當組合中的兩個元素都是「新鮮」時纔會觸發 - 之前從未返回。我把它又名.CombineVeryLatest .BrokenZip;)

使用.CombineVeryLatest,上述想法可以實現這樣的:

public static IObservable<T> SampleResponsive<T>(
     this IObservable<T> source, TimeSpan delay) 
    { 
     return source.Publish(src => 
     { 
      var fire = new Subject<T>(); 

      var whenCanFire = fire 
       .Select(u => new Unit()) 
       .Delay(delay) 
       .StartWith(new Unit()); 

      var subscription = src 
       .CombineVeryLatest(whenCanFire, (x, flag) => x) 
       .Subscribe(fire); 

      return fire.Finally(subscription.Dispose); 
     }); 
    } 

    public static IObservable<TResult> CombineVeryLatest 
     <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
     IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
    { 
     var ls = leftSource.Select(x => new Used<TLeft>(x)); 
     var rs = rightSource.Select(x => new Used<TRight>(x)); 
     var cmb = ls.CombineLatest(rs, (x, y) => new { x, y }); 
     var fltCmb = cmb 
      .Where(a => !(a.x.IsUsed || a.y.IsUsed)) 
      .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; }); 
     return fltCmb.Select(a => selector(a.x.Value, a.y.Value)); 
    } 

    private class Used<T> 
    { 
     internal T Value { get; private set; } 
     internal bool IsUsed { get; set; } 

     internal Used(T value) 
     { 
      Value = value; 
     } 
    } 

編輯:這裏是在論壇上提出的由AndreasKöpf賓館CombineVeryLatest的另一個更緊湊變化:

public static IObservable<TResult> CombineVeryLatest 
    <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
{ 
    return Observable.Defer(() => 
    { 
     int l = -1, r = -1; 
     return Observable.CombineLatest(
      leftSource.Select(Tuple.Create<TLeft, int>), 
      rightSource.Select(Tuple.Create<TRight, int>), 
       (x, y) => new { x, y }) 
      .Where(t => t.x.Item2 != l && t.y.Item2 != r) 
      .Do(t => { l = t.x.Item2; r = t.y.Item2; }) 
      .Select(t => selector(t.x.Item1, t.y.Item1)); 
    }); 
} 
0

您是否嘗試過的Throttle擴展方法?

從文檔:

從後面是另一個值duetime參數

這不是很清楚,我是否是會做你想要或者不是一個觀察的序列忽略值 - 因爲你想忽略下面的值而不是第一個值......但我會期望它是你想要的。試試:)

編輯:嗯......不,我不認爲Throttle是正確的事情,畢竟。我相信我看到了你想要做的事情,但是我看不出在框架中做什麼。儘管我可能錯過了一些東西。你有沒有在Rx論壇上提問?它很可能是,如果它現在不在那裏,他們會很樂意將其添加:)

嫌疑你可以用SkipUntilSelectMany莫名其妙地做到這一點狡猾......但我認爲它應該是在其自己的方法。

+0

感謝喬恩。我給了它一個鏡頭,但它不是我想要的。 在這個例子中,使用Throttle會導致除最後一個事件之外的所有事件都被忽略。我需要對第一個事件作出反應(以提供響應系統),但是爲了隨後的事件以1秒的採樣率延遲。 – Alex 2010-07-09 09:34:44

+0

@亞歷克斯:是的,這也是我發現的。 (看我的編輯。) – 2010-07-09 09:46:03

0

您在搜索的是CombineLatest。

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector 
) 

合併2個可觀察項,並返回所有值,當選擇器(時間)有一個值。

編輯:約翰是正確的,那就是也許不是首選的解決方案

+0

我不明白他是怎麼回事 - 你在這裏看到兩個可觀察到的情況? – 2010-07-09 09:06:29

+0

一個是他生成的事件,選擇器是一個Observable.Interval(TimeSpan.FromSeconds(1)) – cRichter 2010-07-09 09:31:28

+0

每次* *中哪一個產生一個值都不會產生一個事件嗎? – 2010-07-09 09:34:38

9

好吧,

你有3個場景中的位置:

1)我想獲得的一個值事件流每秒。 意味着:如果它每秒產生更多的事件,你將得到一個總是更大的緩衝區。

observableStream.Throttle(timeSpan) 

2)我想獲得最新的事件,這是第二個發生 手段之前生產的:其他事件會被丟棄。

observableStream.Sample(TimeSpan.FromSeconds(1)) 

3)你想獲得所有事件,發生在最後一秒。和每一秒鐘

observableStream.BufferWithTime(timeSpan) 

4)要選擇第二之間發生了什麼與所有的值,直到第二過去了,你的結果返回

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent) 
+1

Dang,場景2正是我正在尋找的,並且我一直無法找到該方法:( – deadlydog 2012-10-23 18:17:31

+1

如果有人需要它:'stream。示例(TimeSpan.FromSeconds(1))' – AlexFoxGill 2013-06-24 14:56:05

2

好吧,這裏是一個解。我不喜歡它,特別是,但是...哦。

喬恩給我的指導是在SkipWhile和cRichter的BufferWithTime。多謝你們。

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1)); 

    var action = new Action<Timestamped<int>>(
     feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}", 
            feed.Value, 
            feed.Timestamp.ToString("mm:ss.fff"), 
            DateTime.Now.ToString("mm:ss.fff"))); 

    var reactImmediately = true; 
    bufferedAtOneSec.Subscribe(list => 
            { 
             if (list.Count == 0) 
             { 
              reactImmediately = true; 
             } 
             else 
             { 
              action(list.Last()); 
             } 
            }); 
    generator 
     .SkipWhile(item => reactImmediately == false) 
     .Subscribe(feed => 
         { 
          if(reactImmediately) 
          { 
           reactImmediately = false; 
           action(feed); 
          } 
         }); 

    Console.ReadKey(); 
} 
5

這是什麼我張貼作爲回答這個問題在Rx forum

UPDATE: 這是一個新的版本時,作爲事件的時有發生,它不再拖延事件轉發超過一秒鐘的差別:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval) 
{ 
    return Observable.CreateWithDisposable<T>(o => 
    { 
     object gate = new Object(); 
     Notification<T> last = null, lastNonTerminal = null; 
     DateTime referenceTime = DateTime.UtcNow - minInterval; 
     var delayedReplay = new MutableDisposable(); 
     return new CompositeDisposable(source.Materialize().Subscribe(x => 
     { 
      lock (gate) 
      { 
       var elapsed = DateTime.UtcNow - referenceTime; 
       if (elapsed >= minInterval && delayedReplay.Disposable == null) 
       { 
        referenceTime = DateTime.UtcNow; 
        x.Accept(o); 
       } 
       else 
       { 
        if (x.Kind == NotificationKind.OnNext) 
         lastNonTerminal = x; 
        last = x; 
        if (delayedReplay.Disposable == null) 
        { 
         delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() => 
         { 
          lock (gate) 
          { 
           referenceTime = DateTime.UtcNow; 
           if (lastNonTerminal != null && lastNonTerminal != last) 
            lastNonTerminal.Accept(o); 
           last.Accept(o); 
           last = lastNonTerminal = null; 
           delayedReplay.Disposable = null; 
          } 
         }, minInterval - elapsed); 
        } 
       } 
      } 
     }), delayedReplay); 
    }); 
} 

這是我早期的嘗試:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
    .Timestamp(); 

source.Publish(o => 
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1))) 
).Run(x => Console.WriteLine(x)); 
+4

剛剛在Microsoft Todo UWP應用程序中找到此答案的鏈接點數:) – 2017-09-19 12:57:14

6

我這個同樣的問題,昨晚掙扎,並且我相信我已經找到了一個更優雅的(或至少縮短)解決方案:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1)); 
var throttledSource = source.Take(1).Concat(delay).Repeat(); 
0

受Bluelings答案的啓發我在這裏提供了一個與Reactive Extensions 2.2.5編譯的版本。

此特定版本計數樣本數量並提供最後一個採樣值。要做到這一點下面的類用於:

class Sample<T> { 

    public Sample(T lastValue, Int32 count) { 
    LastValue = lastValue; 
    Count = count; 
    } 

    public T LastValue { get; private set; } 

    public Int32 Count { get; private set; } 

} 

這是運營商:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) { 
    if (source == null) 
    throw new ArgumentNullException(nameof(source)); 
    return Observable.Create<Sample<T>>(
    observer => { 
     var gate = new Object(); 
     var lastSampleValue = default(T); 
     var lastSampleTime = default(DateTime); 
     var sampleCount = 0; 
     var scheduledTask = new SerialDisposable(); 
     return new CompositeDisposable(
     source.Subscribe(
      value => { 
      lock (gate) { 
       var now = DateTime.UtcNow; 
       var elapsed = now - lastSampleTime; 
       if (elapsed >= interval) { 
       observer.OnNext(new Sample<T>(value, 1)); 
       lastSampleValue = value; 
       lastSampleTime = now; 
       sampleCount = 0; 
       } 
       else { 
       if (scheduledTask.Disposable == null) { 
        scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
        interval - elapsed, 
        () => { 
         lock (gate) { 
         if (sampleCount > 0) { 
          lastSampleTime = DateTime.UtcNow; 
          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
          sampleCount = 0; 
         } 
         scheduledTask.Disposable = null; 
         } 
        } 
       ); 
       } 
       lastSampleValue = value; 
       sampleCount += 1; 
       } 
      } 
      }, 
      error => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnError(error); 
      }, 
     () => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnCompleted(); 
      } 
     ), 
     scheduledTask 
    ); 
    } 
); 
}