2016-05-31 56 views
2

我正在嘗試創建一個可以暫停的Observable,以便物品停止通過可觀察物體,直到它變爲未暫停爲止。如何暫停事件流經可觀察對象?

在這一點上,我希望它恢復處理所有未處理的項目。我的數據源來自外部類,所以我有什麼風這樣看:

class Agent { 
    val publisher = PublishSubject.create<Event>() 
    val subscription = createSubscription() 

    fun trackEvent(e: Event) { 
     publisher.onNext(e) 
    } 

    fun pause() { 
     // ??? 
    } 

    fun resume() { 
     // ??? 
    } 

    private fun createSubscription(): Subscription { 
     return publisher 
       .map { stringify(it) } 
       .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first. 
       .map { /* create HttpPost request */ } 
       .flatMap { /* send request to server */ } 
       .subscribe { println("Received response: $it") } 
    } 
} 

什麼我的目標是,pause功能將從甚至將服務器停止的事件(但會堅持他們直到最終resume)。在發生點resume時,我們會發送事件。 (顯然我們會加入一些額外的背壓幫助,以防止在暫停狀態下事件太多

我試過各種各樣的緩衝和窗口使用來完成這項工作,但它實際上從未暫停可觀察到的相反,兩種情況之一發生:

  1. 事件中完全停止(在退訂,過濾器等的情況下)
  2. 事件,好象什麼流經已發生。

有什麼我可以做,以支持這個用例?或者我是否應該在預期上述兩個結果之一會發生什麼?

回答

1

訣竅是使用另一個BehaviorSubject作爲額外的緩衝關閉事件:

val publisher = PublishSubject.create<Event>() 
fun trackEvent(e: Event) { 
    publisher.onNext(e) 
    isPaused.onNext(isPaused.value) 
} 

val isActive = BehaviorSubject.create(true) 
fun pause() { 
    isActive.onNext(false) 
} 
fun resume() { 
    isActive.onNext(true) 
} 

private fun createSubscription(): Subscription { 
    return publisher 
      .buffer(10L, TimeUnit.SECONDS, 500) // -> Observable<List<Event>> 
      .buffer({ isActive.filter { it } }) // -> Observable<List<List<Event>>> 
      .flatMap { Observable.from(it) } // -> Observable<List<Event>> 
      .map { /* create HttpPost request */ } 
      .flatMap { /* send request to server */ } 
      .subscribe { println("Received response: $it") } 
} 

第一buffer通話會使傳入事件到與指定大小或時間過去之後。第二個buffer將關閉當前存儲桶關於observable發出的事件,指示Agent未暫停(isActive.filter { it })。 isActive爲每個事件發出一個值,並且因爲isActiveBehaviorSubject它將向每個新用戶發出其最後的值。那就是每由第一個buffer發出,它會立即繼續或等到Agent恢復。

+0

有趣,謝謝!明天我會試試這個,但它絕對看起來很有希望。 – ashays

+0

經過測試後,它看起來像'isActive'只會發出一次「真」(而不是每個緩衝區),直到再次調用暫停和取消暫停爲止。這意味着所有事情都會繼續緩衝,並且更像是有一種「流失」,只要我致電簡歷就會流失。它看起來像根據你所說的,我應該有一個幾乎「恆定」價值的東西被推到「真」或「假」,這取決於它的工作狀態。 – ashays

+0

@ashays查看更新後的版本,我錯過了添加修改的'trackEvent'方法。 – miensol