我正在嘗試創建一個可以暫停的Observable,以便物品停止通過可觀察物體,直到它變爲未暫停爲止。如何暫停事件流經可觀察對象?
在這一點上,我希望它恢復處理所有未處理的項目。我的數據源來自外部類,所以我有什麼風這樣看:
class Agent {
val publisher = PublishSubject.create<Event>()
val subscription = createSubscription()
fun trackEvent(e: Event) {
publisher.onNext(e)
}
fun pause() {
// ???
}
fun resume() {
// ???
}
private fun createSubscription(): Subscription {
return publisher
.map { stringify(it) }
.buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first.
.map { /* create HttpPost request */ }
.flatMap { /* send request to server */ }
.subscribe { println("Received response: $it") }
}
}
什麼我的目標是,pause
功能將從甚至將服務器停止的事件(但會堅持他們直到最終resume
)。在發生點resume
時,我們會發送事件。 (顯然我們會加入一些額外的背壓幫助,以防止在暫停狀態下事件太多
我試過各種各樣的緩衝和窗口使用來完成這項工作,但它實際上從未暫停可觀察到的相反,兩種情況之一發生:
- 事件中完全停止(在退訂,過濾器等的情況下)
- 事件,好象什麼流經已發生。
有什麼我可以做,以支持這個用例?或者我是否應該在預期上述兩個結果之一會發生什麼?
有趣,謝謝!明天我會試試這個,但它絕對看起來很有希望。 – ashays
經過測試後,它看起來像'isActive'只會發出一次「真」(而不是每個緩衝區),直到再次調用暫停和取消暫停爲止。這意味着所有事情都會繼續緩衝,並且更像是有一種「流失」,只要我致電簡歷就會流失。它看起來像根據你所說的,我應該有一個幾乎「恆定」價值的東西被推到「真」或「假」,這取決於它的工作狀態。 – ashays
@ashays查看更新後的版本,我錯過了添加修改的'trackEvent'方法。 – miensol