我想你也可以在這裏使用Throttle。 Throttle的目的不是讓元素在給定的時間範圍內收到另一個元素。因此,在您的情況下,如果在10秒內收到更新消息,則不要發送狀態。請參閱下面的單元測試,其中使用200個刻度作爲節流期。
[TestMethod]
public void Publish_Status_If_Nothing_Receieved()
{
//Arrange
var scheduler = new TestScheduler();
var statusObserver = scheduler.CreateObserver<Unit>();
var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3),
OnNext(700, 4));
var waitTime = TimeSpan.FromTicks(200);
//Act
updateStream.Throttle(waitTime, scheduler)
.Select(_ => Unit.Default)
.Subscribe(statusObserver);
//Verify no status received
scheduler.AdvanceTo(100);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Verify no status received
scheduler.AdvanceTo(200);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Assert status received
scheduler.AdvanceTo(400);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
//Verify no more status received
scheduler.AdvanceTo(700);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
}
很乾淨的實現。如果狀態在一段時間內沒有改變,我使用它作爲「註銷」計時器,該計時器依賴Redux存儲(即流)來觸發註銷。這種事情讓Rx非常有趣。 –