2017-10-19 168 views
2

我有一個數據流,具有快速傳入的數據。我想通過保持順序將它們插入到數據庫中。我有一個數據庫,它返回一個承諾,在插入成功時解決。RxJs緩衝區,直到數據庫插入(承諾)

我想創建一個Rx流,緩衝新數據,直到緩衝數據被插入。

我該怎麼做?

+0

這是什麼問題?有'buffer','bufferToggle'或'bufferWhen'運算符。 – martin

+0

問題是我不知道如何使用它們。試圖找出,但不知道如何。 –

+1

使用'concatMap',從項目函數返回承諾。 'concatMap'會爲你做緩衝,但是RxJS沒有背壓,所以如果你的數據到達的速度比你寫的速度快,你會耗盡內存。 – cartant

回答

2

我相信要得到你想要的東西,你需要創建自己的操作符。從RxJS打破略,你可以得到類似的信息(警告,沒有測試)...

export class BusyBuffer<T> { 
    private itemQueue = new Subject<T>(); 
    private bufferTrigger = new Subject<{}>(); 
    private busy = false; 

    constructor(consumerCallback: (items: T[]) => Promise<void>) { 
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => { 
     this.busy = true; 
     consumerCallback(items).then(() => { 
     this.busy = false; 
     this.bufferTrigger.next(null); 
     }); 
    }); 
    } 

    submitItem(item: T) { 
    this.itemQueue.next(item); 
    if(!busy) { 
     this.bufferTrigger.next(null); 
    } 
    } 

} 

然後可以用作

let busyBuffer = new BusyBuffer<T>(items => { 
    return database.insertRecords(items); 
}); 
items.subscribe(item => busyBuffer.submitItem(item)); 

這不完全是純粹的反應雖然有人可能能夠想出更好的東西。

+2

謝謝!我創造了同樣的,但我希望有人能拿出一個純粹的反應解決方案:) –

+0

沒有汗水,祝你好運。我正在考慮從數據庫中獲取某種繁忙/免費的信號,並將其傳回緩衝區方法,但如果數據庫沒有做任何事情,您還必須添加緩衝區應該立即發出的邏輯。 – Pace

+0

如果你想要更好的東西,不要接受我的答案。我不介意。 – Pace