2017-04-25 62 views
1

我有兩個對象流,即帳戶和餘額。根據特定條件在RxJs中加入兩個可觀察對象流

我需要合併(加入)兩個流根據idaccount_id

var accounts = Rx.Observable.from([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.from([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

什麼是預期:

var results = [ 
    { id: 1, name: 'account 1', balance: 100}, 
    { id: 2, name: 'account 2', balance: 200}, 
    { id: 3, name: 'account 3', balance: 300}, 
]; 

是與RxJs這可行嗎?

要清楚我知道如何用普通的js/lodash或類似的東西來做到這一點。在我的情況下,我從Angular Http Module獲得這些流,所以我問在這種情況下我是否可以從RxJs中受益

+1

待辦事項你想要結果,一旦兩個流都結束或建立它們隨着時間的推移? – Maxime

+0

如果您要發佈示例代碼,這將有助於更好地理解問題。如前所述,目前還不清楚爲什麼不直接在數組上進行連接,而不是從流內部進行連接。但無論如何,是的,可以做你提到的加入。參看http://stackoverflow.com/questions/43332674/rxjs-to-combine-attributes-from-triples-to-a-table/43333630#43333630其中顯示瞭如何迭代構建一個結構 – user3743222

+0

@Maxime,在我的情況下完成後,但是如果在流程中有辦法做到這一點,那麼也可以使用 – amd

回答

0

對於您的某條評論,您的示例是模擬來自Angular Http調用的流。

所以不是:

var accounts = Rx.Observable.from([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.from([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

我寧願說它是:

var accounts = Rx.Observable.of([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.of([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

爲什麼:from將逐一發出的每個項目,of會散發出整個數組我猜你的http響應是整個陣列。

這就是說,你可能想實現的是:

const { Observable } = Rx; 

// simulate HTTP requests 
const accounts$ = Rx.Observable.of([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' } 
]); 

const balances$ = Rx.Observable.of([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 } 
]); 

// utils 
const joinArrays = (accounts, balances) => 
    accounts 
    .map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance })); 

const findBalanceByAccountId = (balances, id) => 
    balances.find(balance => balance.account_id === id) || { balance: 0 }; 

const print = (obj) => JSON.stringify(obj, null, 2) 

// use forkJoin to start both observables at the same time and not wait between every request 
Observable 
    .forkJoin(accounts$, balances$) 
    .map(([accounts, balances]) => joinArrays(accounts, balances)) 
    .do(rslt => console.log(print(rslt))) 
    .subscribe(); 

輸出:

[ 
    { 
    "id": 1, 
    "name": "account 1", 
    "balance": 100 
    }, 
    { 
    "id": 2, 
    "name": "account 2", 
    "balance": 200 
    }, 
    { 
    "id": 3, 
    "name": "account 3", 
    "balance": 300 
    } 
] 

這裏有一個工作Plunkr:https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

編輯1: 在數組上編寫結果的工作可能不是性能的最佳想法,而不是返回數組,可能嘗試返回一個具有該鍵的帳戶ID。這樣,你可以簡單地刪除findBalanceByAccountId功能,並具有更快的應用程序(這裏只修改後的代碼)

const balances$ = Rx.Observable.of({ 
    1: { account_id: 1, balance: 100 }, 
    2: { account_id: 2, balance: 200 }, 
    3: { account_id: 3, balance: 300 } 
}); 

// utils 
const joinArrays = (accounts, balances) => 
    accounts 
    .map(account => Object.assign(
     {}, 
     account, 
     { balance: balances[account.id].balance } 
    )); 
+0

謝謝,但不幸的是,你的假設是不正確的:s,實際上,我將數據視爲流的對象'Observable '而不是單個數組的流'可觀察' – amd

0

可以使用的GroupBy操作者ID加入兩個項目:

Rx.Observable.merge(accounts, balances.map(({account_id, balance})=>({id: account_id, balance}))) 
.groupBy(accountInfo => accountInfo.id) 
.flatMap(accountInfo$ => accountInfo$.scan((acc, info) => Object.assign(acc, info), {}) 
    .filter(accountInfo => accountInfo.name && accountInfo.balance) 
    .take(1) 
).subscribe(console.log)