我有一個數據流,具有快速傳入的數據。我想通過保持順序將它們插入到數據庫中。我有一個數據庫,它返回一個承諾,在插入成功時解決。RxJs緩衝區,直到數據庫插入(承諾)
我想創建一個Rx流,緩衝新數據,直到緩衝數據被插入。
我該怎麼做?
我有一個數據流,具有快速傳入的數據。我想通過保持順序將它們插入到數據庫中。我有一個數據庫,它返回一個承諾,在插入成功時解決。RxJs緩衝區,直到數據庫插入(承諾)
我想創建一個Rx流,緩衝新數據,直到緩衝數據被插入。
我該怎麼做?
我相信要得到你想要的東西,你需要創建自己的操作符。從RxJS打破略,你可以得到類似的信息(警告,沒有測試)...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
然後可以用作
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
這不完全是純粹的反應雖然有人可能能夠想出更好的東西。
這是什麼問題?有'buffer','bufferToggle'或'bufferWhen'運算符。 – martin
問題是我不知道如何使用它們。試圖找出,但不知道如何。 –
使用'concatMap',從項目函數返回承諾。 'concatMap'會爲你做緩衝,但是RxJS沒有背壓,所以如果你的數據到達的速度比你寫的速度快,你會耗盡內存。 – cartant