時間跨度我有以下類型的未來事件的熱氣流:無擴展GROUP BY,獨特的BufferWindow直到與消除
Event
{
string name;
int state ; // its 1 or 2 ie active or unactive
}
有提供給定名稱的父名稱的函數 - 字符串的getParent(串名字)
我需要緩衝每個父母的事件2分鐘,如果在這2分鐘內,我recv任何事件的孩子狀態= 2爲給定的父母,這個緩衝區應該取消,應該輸出0,否則我得到事件回顧計數。
我知道我必須使用GroupBy分區,然後緩衝,然後計數,但我無法想到一種方式,我創建緩衝區,這是獨特的每個父母,我雖然使用獨特但這並不解決問題,因爲我只是不想創建緩衝區,直到父活躍(因爲一旦父緩衝區被取消或2分鐘結束,父緩衝區可以再次創建) 所以我明白我需要創建一個自定義緩衝區,它會檢查創建緩衝區的條件,但是我如何通過被動擴展來做到這一點。 任何幫助將不勝感激。 關於
感謝布蘭登的幫助。這是我用於測試的主要程序。它不反應的擴展問題working.As我是新能的方式我測試
namespace TestReactive
{
class Program
{
static int abc = 1;
static void Main(string[] args)
{
Subject<AEvent> memberAdded = new Subject<AEvent>();
//ISubject<AEvent, AEvent> syncedSubject = new ISubject<AEvent, AEvent>();
var timer = new Timer { Interval = 5 };
timer.Enabled = true;
timer.Elapsed += (sender, e) => MyElapsedMethod(sender, e, memberAdded);
var bc = memberAdded.Subscribe();
var cdc = memberAdded.GroupBy(e => e.parent)
.SelectMany(parentGroup =>
{
var children = parentGroup.Publish().RefCount();
var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0);
var timer1 = Observable.Timer(TimeSpan.FromSeconds(1));
var activeCount = children.TakeUntil(timer1).Count();
return Observable.Amb(activeCount, inactiveChild)
.Select(count => new { ParentName = parentGroup.Key, Count = count });
});
Observable.ForEachAsync(cdc, x => WriteMe("Dum Dum " + x.ParentName+x.Count));
// group.Dump("Dum");
Console.ReadKey();
}
static void WriteMe(string sb)
{
Console.WriteLine(sb);
}
static void MyElapsedMethod(object sender, ElapsedEventArgs e, Subject<AEvent> s)
{
AEvent ab = HelperMethods.GetAlarm();
Console.WriteLine(abc + " p =" + ab.parent + ", c = " + ab.name + " ,s = " + ab.state);
s.OnNext(ab);
}
}
}
public static AEvent GetAlarm()
{
if (gp> 4)
gp = 1;
if (p > 4)
p = 1;
if (c > 4)
c = 1;
AEvent a = new AEvent();
a.parent = "P" + gp + p;
a.name = "C" + gp + p + c;
if (containedKeys.ContainsKey(a.name))
{
a.state = containedKeys[a.name];
if (a.state == 1)
containedKeys[a.name] = 2;
else
containedKeys[a.name] = 1;
}
else
{
containedKeys.TryAdd(a.name, 1);
}
gp++; p++; c++;
return a;
}
那麼這種方法,產生在每個刻度爲父事件。它爲State = 1的父P11,P22,P33,P44生成事件,然後生成State = 2的父P11,P22,P33,P44的事件。我使用Observable.ForEach打印結果,我看到它的存在所謂的4倍,之後其分毫,它像消除羣組的是沒有發生
你好布蘭登,謝謝你的幫助。我正在測試它。我創建了一個主題 –
user1874472
2013-05-09 17:16:57