2013-05-09 51 views
1

時間跨度我有以下類型的未來事件的熱氣流:無擴展GROUP BY,獨特的BufferWindow直到與消除

Event 
{ 
    string name; 
    int state ; // its 1 or 2 ie active or unactive 
} 

有提供給定名稱的父名稱的函數 - 字符串的getParent(串名字)

我需要緩衝每個父母的事件2分鐘,如果在這2分鐘內,我recv任何事件的孩子狀態= 2爲給定的父母,這個緩衝區應該取消,應該輸出0,否則我得到事件回顧計數。

我知道我必須使用GroupBy分區,然後緩衝,然後計數,但我無法想到一種方式,我創建緩衝區,這是獨特的每個父母,我雖然使用獨特但這並不解決問題,因爲我只是不想創建緩衝區,直到父活躍(因爲一旦父緩衝區被取消或2分鐘結束,父緩衝區可以再次創建) 所以我明白我需要創建一個自定義緩衝區,它會檢查創建緩衝區的條件,但是我如何通過被動擴展來做到這一點。 任何幫助將不勝感激。 關於

感謝布蘭登的幫助。這是我用於測試的主要程序。它不反應的擴展問題working.As我是新能的方式我測試

namespace TestReactive 
{ 
class Program 
{ 
    static int abc = 1; 
    static void Main(string[] args) 
    { 
     Subject<AEvent> memberAdded = new Subject<AEvent>(); 
     //ISubject<AEvent, AEvent> syncedSubject = new ISubject<AEvent, AEvent>(); 
     var timer = new Timer { Interval = 5 }; 

     timer.Enabled = true; 
     timer.Elapsed += (sender, e) => MyElapsedMethod(sender, e, memberAdded); 

     var bc = memberAdded.Subscribe(); 
     var cdc = memberAdded.GroupBy(e => e.parent) 
    .SelectMany(parentGroup => 
    { 
     var children = parentGroup.Publish().RefCount(); 
     var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0); 
     var timer1 = Observable.Timer(TimeSpan.FromSeconds(1)); 
     var activeCount = children.TakeUntil(timer1).Count(); 

     return Observable.Amb(activeCount, inactiveChild) 
      .Select(count => new { ParentName = parentGroup.Key, Count = count }); 

    }); 

     Observable.ForEachAsync(cdc, x => WriteMe("Dum Dum " + x.ParentName+x.Count)); 
     // group.Dump("Dum"); 


     Console.ReadKey(); 
    } 

    static void WriteMe(string sb) 
    { 

     Console.WriteLine(sb); 
    } 
    static void MyElapsedMethod(object sender, ElapsedEventArgs e, Subject<AEvent> s) 
    { 
     AEvent ab = HelperMethods.GetAlarm(); 
     Console.WriteLine(abc + " p =" + ab.parent + ", c = " + ab.name + " ,s = " + ab.state); 
     s.OnNext(ab); 

    } 

} 

}

public static AEvent GetAlarm() 
    { 
     if (gp> 4) 
      gp = 1; 
     if (p > 4) 
      p = 1; 
     if (c > 4) 
      c = 1; 
     AEvent a = new AEvent(); 
     a.parent = "P" + gp + p; 
     a.name = "C" + gp + p + c; 
     if (containedKeys.ContainsKey(a.name)) 
     { 
      a.state = containedKeys[a.name]; 
      if (a.state == 1) 
       containedKeys[a.name] = 2; 
      else 
       containedKeys[a.name] = 1; 

     } 
     else 
     { 
      containedKeys.TryAdd(a.name, 1); 

     } 
     gp++; p++; c++; 

     return a; 

    } 

那麼這種方法,產生在每個刻度爲父事件。它爲State = 1的父P11,P22,P33,P44生成事件,然後生成State = 2的父P11,P22,P33,P44的事件。我使用Observable.ForEach打印結果,我看到它的存在所謂的4倍,之後其分毫,它像消除羣組的是沒有發生

回答

0

像....

source.GroupBy(e => GetParent(e.name)) 
     .SelectMany(parentGroup => 
     { 
      var children = parentGroup.Publish().RefCount(); 
      var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0); 
      var timer = Observable.Timer(TimeSpan.FromMinutes(2)); 
      var activeCount = children.TakeUntil(timer).Count(); 

      return Observable.Amb(activeCount, inactiveChild) 
       .Select(count => new { ParentName = parentGroup.Key, Count = count }; 
     }); 

這會給你的序列{ParentName,計數}對象。

+0

你好布蘭登,謝謝你的幫助。我正在測試它。我創建了一個主題 user1874472 2013-05-09 17:16:57

2

假設每個組兩分鐘的緩衝應該在兩分鐘後,只要該組的第一個事件被認爲是打開和關閉或零狀態被認爲是,那麼我認爲以下工作:

public static IObservable<EventCount> EventCountByParent(
    this IObservable<Event> source, IScheduler scheduler) 
{ 
    return Observable.Create<EventCount>(observer => source.GroupByUntil(
     evt => GetParent(evt.Name), 
     evt => evt, 
     group => 
     @group.Where(evt => evt.State == 2) 
       .Merge(Observable.Timer(
        TimeSpan.FromMinutes(2), scheduler).Select(_ => Event.Null))) 
       .SelectMany(
        go => 
        go.Aggregate(0, (acc, evt) => (evt.State == 2 ? 0 : acc + 1)) 
       .Select(count => new EventCount(go.Key, count))).Subscribe(observer)); 
} 

隨着EVENTCOUNT(實現平等覆蓋用於測試)爲:

public class EventCount 
{ 
    private readonly string _name; 
    private readonly int _count; 

    public EventCount(string name, int count) 
    { 
     _name = name; 
     _count = count; 
    } 

    public string Name { get { return _name; } } 
    public int Count { get { return _count; } } 

    public override string ToString() 
    { 
     return string.Format("Name: {0}, Count: {1}", _name, _count); 
    } 

    protected bool Equals(EventCount other) 
    { 
     return string.Equals(_name, other._name) && _count == other._count; 
    } 

    public override bool Equals(object obj) 
    { 
     if (ReferenceEquals(null, obj)) return false; 
     if (ReferenceEquals(this, obj)) return true; 
     if (obj.GetType() != this.GetType()) return false; 
     return Equals((EventCount) obj); 
    } 

    public override int GetHashCode() 
    { 
     unchecked 
     { 
      return ((_name != null ? _name.GetHashCode() : 0)*397)^_count; 
     } 
    } 
} 

和事件爲:

public class Event 
{ 
    public static Event Null = new Event(string.Empty, 0); 

    private readonly string _name; 
    private readonly int _state; 

    public Event(string name, int state) 
    { 
     _name = name; 
     _state = state; 
    } 

    public string Name { get { return _name; } } 
    public int State { get { return _state; } } 
} 

我做了一個快速(即不詳盡)用Rx-Testing測試:

public class EventCountByParentTests : ReactiveTest 
{ 
    private readonly TestScheduler _testScheduler; 

    public EventCountByParentTests() 
    { 
     _testScheduler = new TestScheduler(); 
    } 

    [Fact] 
    public void IsCorrect() 
    { 
     var source = _testScheduler.CreateHotObservable(
      OnNext(TimeSpan.FromSeconds(10).Ticks, new Event("A", 1)), 
      OnNext(TimeSpan.FromSeconds(20).Ticks, new Event("B", 1)), 
      OnNext(TimeSpan.FromSeconds(30).Ticks, new Event("A", 1)), 
      OnNext(TimeSpan.FromSeconds(40).Ticks, new Event("B", 1)), 
      OnNext(TimeSpan.FromSeconds(50).Ticks, new Event("A", 1)), 
      OnNext(TimeSpan.FromSeconds(60).Ticks, new Event("B", 2)), 
      OnNext(TimeSpan.FromSeconds(70).Ticks, new Event("A", 1)), 
      OnNext(TimeSpan.FromSeconds(140).Ticks, new Event("A", 1)), 
      OnNext(TimeSpan.FromSeconds(150).Ticks, new Event("A", 1))); 

     var results = _testScheduler.CreateObserver<EventCount>(); 

     var sut = source.EventCountByParent(_testScheduler).Subscribe(results); 

     _testScheduler.Start(); 

     results.Messages.AssertEqual(
      OnNext(TimeSpan.FromSeconds(60).Ticks, new EventCount("B", 0)), 
      OnNext(TimeSpan.FromSeconds(130).Ticks, new EventCount("A", 4)), 
      OnNext(TimeSpan.FromSeconds(260).Ticks, new EventCount("A", 2))); 
    } 
}