2017-10-18 66 views
0

我有越來越與我執行後續XHR請求,像這樣的陣列的XHR請求:如何成對緩衝觀察值並通過對執行它們?

const Rx = require('rxjs/Rx'); 
const fetch = require('node-fetch'); 

const url = `url`; 

// Get array of tables 
const tables$ = Rx.Observable 
    .from(fetch(url).then((r) => r.json())); 

// Get array of columns 
const columns$ = (table) => { 
    return Rx.Observable 
    .from(fetch(`${url}/${table.TableName}/columns`).then(r => r.json())); 
}; 

tables$ 
    .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))  
    .subscribe(val => console.log(val)); 

我想執行在chuncks列請求,以便請求沒有被髮送到服務器立刻。

這太問題是有些在同一個方向,但不是完全:Rxjs: Chunk and delay stream?

現在,我想是這樣的:

tables$ 
    .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$))) 
    .flatMap(e => e) 
    .bufferCount(4) 
    .executeTheChunksSerial(magic) 
    .flatMap(e => e) 
    .subscribe(val => console.log(val)); 

但我不能換我圍​​繞着如何執行該塊頭在系列...

+0

看起來你有一個以上的高階可觀察這使得這個很難理解。也許你可以使用'.concatMap(buffered => Observable.forkJoin(buffered))'而不是'.executeTheChunksSerial(magic)',但我不確定。 – martin

回答

2

可以利用的mergeMapconcurrency參數來獲得最大的X同時請求到服務器:

const getTables = Promise.resolve([{ tableName: 'foo' },{ tableName: 'bar' },{ tableName: 'baz' }]); 
 
const getColumns = (table) => Rx.Observable.of('a,b,c') 
 
    .do(_ => console.log('getting columns for table: ' + table)) 
 
    .delay(250); 
 
     
 
Rx.Observable.from(getTables) 
 
    .mergeAll() 
 
    .mergeMap(
 
    table => getColumns(table.tableName), 
 
    (table, columns) => ({ table, columns }), 
 
    2) 
 
    .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.js"></script>