2017-08-09 58 views
3

問題:我想創建Rxjs組合(函數鏈),它將導致從一個Observable中緩衝值,直到發生某些事件,然後同步發送所有緩衝值,然後緩衝直到下一個事件。RxJS:在某個事件發生後緩衝區發出

我使用這個函數來收集所有的http請求,這些請求必須等到我的應用程序進行授權調用。然後運行所有這些請求。 (它在Angular4 HttpClient攔截器中實現),這是我的用例,但我通常尋求如何創建這種rx鏈的解決方案。

爲什麼Rxjs緩衝區不受歡迎。從我讀取和測試的緩衝區中,要麼需要精確的時間幀,要麼在獲取調度程序而不是時間作爲參數的情況下,在檢測到最後一個調度程序的「事件」傳播後重新訂閱調度程序。我希望它像這樣工作:當出現第一個請求時,我開始緩衝,然後訂閱調度程序,在調度程序發出後,停止緩衝,重新發送所有緩衝值,並等到下一個新請求再次開始緩衝並且再次啓動調度程序。

我現在的解決方案是使用輔助對象,這就是未定義或我觀察到,與代碼大致如下:

private observable: Observable<boolean>; 

makeRequest(): Observable<boolean> { 
    if (this.observable !== void 0) { 
     return this.observable; 
    } else { 
     this.observable = this.authenticationReuqest() 
      .share() 
      .finally(() => this.observable = void 0); 

     return this.observable; 
    } 
} 

這樣一來我有點緩衝答應我的請求,由maiking他們.delay(),直到同樣的多播可觀察發射,並且在它發射之後,我只是將其清理乾淨(雖然在完成或錯誤之後不需要取消訂閱,因爲它最終清除了)。

如果任何人有一個想法或模式如何用純Rxjs替換此解決方案,我intrested。我有些感覺,儘管我無法得到確切的解決方案,但是Buffer和Zip的一些組合可以實現。

感謝 托馬斯

+0

'buffer()'運算符不會重新訂閱它的通知程序。我認爲你在這裏描述的是「緩衝」操作符現在是如何工作的。 – martin

+0

@martin如文檔http://reactivex.io/documentation/operators/buffer中所述。html緩衝區時需要一些描述時間窗口的參數(無論是時間窗口還是其他可觀察窗口)都有一個接一個的時間框架。 我的第二個可觀察的是Http請求。我想在等待這個請求時緩衝。當我得到第一個無緩衝的值和緩衝區值直到這個http請求結束時,我想發送這個請求。 –

+0

我認爲緩衝區允許做的是有一個由Observable定義的時間框架,但它不允許緩衝源和這個時間框架之間的任何交互提供可觀察的。 如果您只是使用緩衝區功能簡單實現它的確切示例,我將非常樂意看到它。 –

回答

0

你應該使用bufferWhen:它,直到第二個發射從可觀察到的緩存值。

+0

不幸的是,當我得到新的(無緩衝的)請求後,在開箱即可重新訂閱第二個可觀察值時,我將不得不設置任意時間,之後我重新訂閱第二個,或者在完成先前調用後立即重新訂閱。我尋找一個懶惰的模式,每個緩衝區只調用第二個緩衝區,並且只在需要時才調用。 –

0

我遇到了一模一樣的問題,我有要事來在這樣

enter image description here

我開球加載事件早,所以我們並不需要等待。但問題是,只有在一點之後(「rdy」代表準備就緒),我想要處理這些事件。所以準備好的活動需要先到。我需要保持這些值,直到準備就緒並重新發送它們。像這樣

enter image description here

這是我做的,我用multicast共享相同的認購下游。在篩選第一個就緒事件的選擇器函數中創建一個可用的可觀察值。然後合併多播事件的ready observable和buffer,取一個,並將switchMap從數組返回值。最後連接多播事件。下面的代碼

const events = [ 
    'ev1', 
    'ev2', 
    'ready', 
    'ev3', 
    'ev4', 
    'ev5' 
] 
Rx.Observable 
    .interval(500) 
    .take(events.length) 
    .map(i => events[i]) 
    .multicast(
    () => new Rx.Subject(), 
     events$ => { 
      const ready$ = events$ 
      .filter(event => event === 'ready') 
      .take(1) 
      return Rx.Observable 
      .merge(
       ready$, 
       events$ 
        .buffer(ready$) 
        .take(1) 
        .switchMap(events => Rx.Observable.from(events)) 
      ) 
       .concat(events$) 
     } 
) 

或者你可以找到它,並在RxViz運行在這裏https://rxviz.com/v/j8nNpe8N 這不是很漂亮,但它的工作原理。我還在尋找更好的方法來做到這一點,任何想法?