2015-10-21 47 views
2

我有一個項目,我需要每隔10秒發送一個狀態消息,除非在此期間有更新。意思是,每次有更新時,定時器都會重置。無效擴展定時器/間隔復位

var res = Observable 
    .Interval(TimeSpan.FromSeconds(10)) 
    .Where(_ => condition); 

res.Subscribe(_ => Console.WriteLine("Status sent.")); 

現在我知道「Where」只適用於計時器結束時,所以它沒有幫助。但是,我想知道是否有辦法重置Interval;或者重複使用Timer()。

回答

3

這是很容易實現使用標準的Rx運營商。

從你的問題中不清楚的是什麼是「更新」。我會假設你有某種可觀察的事件會引發每次更新,或者你可以創建一個主題,當有更新時你會調用.OnNext(...)。如果沒有可觀察的更新,很難知道何時重置計時器。

所以這裏的代碼:

var update = new Subject<bool>(); 

var res = 
    update 
     .Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0))) 
     .Switch(); 

res 
    .Subscribe(_ => Console.WriteLine("Status sent.")); 

update.OnNext(true); 

res查詢現在等待,直到它從update的值,然後選擇一個新Observable.Interval。這意味着Select之後的類型是IObservable<IObservable<long>>,因此需要.Switch()才能將其轉換爲IObservable<long>.Switch()通過只傳遞最近觀察到的可觀察值和處置任何以前的可觀察值來做到這一點。換句話說,對於每個update,啓動一個新的計時器並取消前一個計時器。這意味着如果您的更新發生頻率超過10秒,那麼定時器將永遠不會啓動。

現在,如果res觀察到的是在自己的權利的更新,那麼你可以這樣做:

res 
    .Subscribe(_ => 
    { 
     update.OnNext(true); 
     Console.WriteLine("Status sent."); 
    }); 

這很好 - 它仍然有效,但對於每個定時器發射res將創建一個新的計時器。這意味着任何依賴於你的可觀察/主題的東西仍然會正常工作。

+0

很乾淨的實現。如果狀態在一段時間內沒有改變,我使用它作爲「註銷」計時器,該計時器依賴Redux存儲(即流)來觸發註銷。這種事情讓Rx非常有趣。 –

-1

在您閱讀下面的代碼,這裏有一個旅的需求下降:

1)執行,如果有10秒(定時器)

2無狀態更新)如果有一個函數狀態更新,定時器將被重置

3)如果函數正在執行,則定時器將停止,然後在函數完成執行時恢復。 (防止同時執行兩次函數,假設該函數執行的時間比計時器的運行時間多

4)定時器上的任何更改都必須是原子的(一次只能有一個線程訪問它!)

static object RequestStatusLock = new object(); 
    ... 
    System.Threading.Timer timer = new System.Threading.Timer(10000); //10second 
    timer.Elapsed += timer_elapsed; //timer elapsed will be an event where you will do request status. 

void KickWatchDog() 
{ 
    if(Monitor.TryEnter(RequestStatusLock)) 
    { 
     //calling Stop() and Start() will reset and restart the timer. 
     timer.Stop(); 
     timer.Start(); 
     Monitor.Exit(RequestStatusLock) 
     } 
} 

void timer_elapsed(object sender,EventArgs e) 
{ 
    if(Monitor.TryEnter(RequestStatusLock) 
    { 
    //if it can get into this section means that no update from the last 10 second! 
    timer.Stop(); 
    RequestStatus(); 
    timer.Start(); 
    Monitor.Exit(RequestStatusLock); 
    } 
} 

有一點要記住的是,這是最好有周圍當定時器經過你執行功能的try/catch塊。原因是,如果它發生故障,你會想要釋放鎖(如果你想讓程序繼續下去)。

注:有2型定時器,System.Threading.TimerSystem.Timer(他們是相同的!),這就是爲什麼我明確地聲明爲System.Threading.Timer

+0

我的解決方案存在的問題是我無法對其進行單元測試。監視器和計時器是不可模仿的。因此,如果我有一個單元測試,我需要做一個Thread.Sleep(10000)來確保我的狀態在10秒後發送。另外,我的問題是關於做一個帶有反應x的可重新設定的10秒定時器(因此反應x代碼使用和system.reactive標記)。 – joniboy

0

我想你也可以在這裏使用Throttle。 Throttle的目的不是讓元素在給定的時間範圍內收到另一個元素。因此,在您的情況下,如果在10秒內收到更新消息,則不要發送狀態。請參閱下面的單元測試,其中使用200個刻度作爲節流期。

[TestMethod] 
    public void Publish_Status_If_Nothing_Receieved() 
    { 
     //Arrange 
     var scheduler = new TestScheduler(); 
     var statusObserver = scheduler.CreateObserver<Unit>(); 
     var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3), 
      OnNext(700, 4)); 

     var waitTime = TimeSpan.FromTicks(200); 

     //Act 
     updateStream.Throttle(waitTime, scheduler) 
      .Select(_ => Unit.Default) 
      .Subscribe(statusObserver); 

     //Verify no status received 
     scheduler.AdvanceTo(100); 
     Assert.AreEqual(0, statusObserver.Messages.Count); 

     //Verify no status received 
     scheduler.AdvanceTo(200); 
     Assert.AreEqual(0, statusObserver.Messages.Count); 

     //Assert status received 
     scheduler.AdvanceTo(400); 
     statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default)); 

     //Verify no more status received 
     scheduler.AdvanceTo(700); 
     statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default)); 
    }