2017-10-19 88 views
2

我有兩個可觀察:RxJs如何將兩個重疊的可觀察到的合併成一個

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-| 
-13--14--15--16--17--18--19-----20---------21--------------22------23--24--> 

第一個包含一些越來越多,但經過一段時間後停止(這些是來自數據庫的光標結果) 第二類是不斷涌現越來越多的人首先包含一些數字,但不要停止發射。 (這些都是新插入的數據到數據庫)

我想這兩個可觀察一下一個連續的觀察到這樣的:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24--> 

此觀察到的包含了每一個號碼只有一次,保持順序發光。

如何使用盡可能少的內存來解決問題?

+0

想知道你是怎麼得到這樣兩個重疊的觀測量流?你的問題是什麼意思? –

+1

我使用Rethinkdb。我在數據庫中有舊數據,它由光標和新插入的數據(由換頁傳送)發送。在我從光標讀取數據的同時,新插入的數據也由光標提供。這導致重疊 –

回答

2

我認爲最好的辦法在這裏是緩衝b $,直到$流達到b $,然後發出所有b $的緩衝項並切換到b $。事情是這樣的:

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'; 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'; 
 

 
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x)); 
 

 
const a$ = fromMarble(a).share(); 
 
const b$ = fromMarble(b).share(); 
 

 
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share(); 
 

 
const distinct$ = Rx.Observable.merge(
 
\t a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
 
\t b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'), 
 
\t b$.skipUntil(switchingSignal$).map(x => x + '(from b$)') 
 
); 
 

 
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>

2

您可以通過採取從(.concat)與第二流連接起來的第一流的所有元素,除了最新的一個(.last)之前(.skipWhile含)的元素

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15' 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24' 
 
const fromMarble = str => Rx.Observable.defer(() => { 
 
    console.log('side effect from subscribing to: ' + str); 
 
    return Rx.Observable.from(str.split('-').filter(v => v.length)); 
 
}); 
 

 
const a$ = fromMarble(a); 
 
const b$ = fromMarble(b); 
 

 
const distinct$ = Rx.Observable.concat(
 
    a$, 
 
    a$.last().switchMap(latest => 
 
    // .skipWhile + .skip(1) => skipWhile but inclusive 
 
    b$.skipWhile(v => v !== latest).skip(1) 
 
), 
 
); 
 

 
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

做到這一點另外,如果訂閱時有副作用(例如訂閱時 - 將創建新光標),則可以通過使用例如const a$ = fromMarble(a).shareReaplay()來爲所有訂戶共享該副作用。

你可以閱讀更多有關共享的副作用:老文檔中

+0

尼斯解決方案。但是,如果b $流在a流之後發出,則不同的$流將被不必要地延遲。不知道這是否是一個問題,但可以毫不拖延地解決。 – ZahiC

+0

很好的@ZahiC和另外一個問題是當'b $'是空的 - 你什麼也得不到。所以我修改了我的答案來處理這種情況。 –

+0

如果b $在$後發射,這肯定會解決延遲問題,但現在您可能訂閱b $的時間太晚(第一個流完成時,已訂閱串聯流)。如果b $訂閱需要時間(例如連接到數據庫),切換到b $將會變慢。您可以通過早點連接到b $來解決這個問題(hot Observable),但這樣可能會丟失一些物品。我將增加另一個解決方案以供參考 – ZahiC