2017-02-13 1601 views
0

我想了解更多RxJS概念。這是我目前正在努力解決的問題。我想通過異步調用將訪問抽象爲數據庫,並且我想同步訪問。使用RxJS同步異步操作

我可以有一個動作流,
- 即做異步調用數據庫
- 我想下一個動作被延遲,直到前面的動作完成
- 操作的調用者應得到的觀察到的,異步操作的結果。

例子:

類的用戶調用措施1:讀取數據庫項目,計算下一個狀態(例如增量字段),寫信給DB
然後..類的
用戶調用的下一步行動(措施2),但行動1仍在進行中。
措施2:讀DB(不得的措施1前開始:寫操作完成)

這怎麼能與RxJS +打字稿做些什麼呢?

弗蘭克

/////////////////////////////////

同時,我有這個代碼:

import * as Rx from 'rxjs'; 

var actionQueue = new Rx.Subject<() => Rx.Observable<any>>(); 
actionQueue 
    .concatMap(v => v()) 
    .subscribe(v => {}); 

// example action with result type number 
function action1 (v : number) : Rx.Observable<number> { 
    console.log(':: action1: ', v); 
    var res = new Rx.Subject<number>(); 
    actionQueue.next(() => { 
    console.log('>> action1: ', v); 
    setTimeout(()=>{ 
     console.log('<< action1: ', v); 
     res.next(v); 
     res.complete(); 
    }, 500); 
    return res; 
    }); 
    return res; 
} 

// some actions enqueue now, after 700+2500ms 
action1(11).subscribe(v => console.log('XX action1: ', v)); 
action1(22).subscribe(v => console.log('XX action1: ', v)); 
action1(33).subscribe(v => console.log('XX action1: ', v)); 

setTimeout(()=>{ 
    action1(44).subscribe(v => console.log('XX action1: ', v)); 
}, 700); 

setTimeout(()=>{ 
    action1(55).subscribe(v => console.log('XX action1: ', v)); 
}, 2500); 

輸出顯示了它的順序的東西。
由於typescript/js noob ...這個代碼有缺陷嗎?有沒有更優雅的方式?

弗蘭克

回答

0

如何使用delayWhen()操作?

// Observable wrapping action1. 
const obsAction1 = Observable.create(observer => { 
    // Read DB item 
    // Calculate next state 
    // Write to DB 
    // Then: 
    observer.complete(); 
}); 

// Private observable wrapping action2. 
// DO NOT subscribe to it directly. 
const _obsAction2 = Observable.create(observer => { 
    // Read DB 
    // Then: 
    observer.complete(); 
}); 

// Public observable wrapping action2 AND delayed by action 1. 
// This is what the client code should subscribe to. 
const obsAction2 = _obsAction2.delayWhen(obsAction1); 

現在代碼消耗可觀:

obsAction1.subscribe(val => console.log(val)); 

// Values will be received only when `obsAction1` emits or completes. 
obsAction2.subscribe(val => console.log(val));