前提
我試圖找到正確的方式提前終止了一系列的Node.js管道流(管道)的:有時我要在流完成之前正常中止流。具體而言,我主要處理objectMode: true
和非本地平行流,但這應該不重要。有道unpipe一個streams2管道和清空(不只是沖水)
問題是,當我unpipe
管道,數據保持在每個流的緩衝器,並且drain
編對於大多數中間流(例如,Readable
/Transform
),這個可能是,但最後的Writable
仍然排到其寫入目標(例如文件或數據庫或套接字或w/e)。如果緩衝區包含數百或數千個需要耗費大量時間的塊,這可能會產生問題。我希望它立即停止,即不排水;爲什麼浪費週期和內存對數據無關緊要?
根據我走的路線,我收到「寫完後」錯誤,或者當流找不到現有管道時發生異常。
問題
什麼是有道優雅地殺死流的形式a.pipe(b).pipe(c).pipe(z)
管道?
解決方案?
我已經提出瞭解決的辦法是3步:
unpipe
管道中的每個流以相反的順序- 實現
Writable
end
每個實現Writable
流空每個流的緩衝器
說明整個過程的一些僞代碼:
var pipeline = [ // define the pipeline
readStream,
transformStream0,
transformStream1,
writeStream
];
// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
if (!tmpBuildStream) {
tmpBuildStream = stream;
continue;
}
tmpBuildStream = lastStream.pipe(stream);
});
// sleep, timeout, event, etc...
// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
if (!tmpTearStream) {
tmpTearStream = stream;
continue;
}
tmpTearStream = stream.unpipe(tmpTearStream);
});
// empty and end the pipeline
pipeline.forEach(function(stream) {
if (typeof stream._writableState === 'object') { // empty
stream._writableState.length -= stream._writableState.buffer.length;
stream._writableState.buffer = [];
}
if (typeof stream.end === 'function') { // kill
stream.end();
}
});
我真的很擔心的stream._writableState
使用和修改內部buffer
和length
特性(_
意味着私有財產)。這看起來像一個黑客。還請注意,由於我是管道系統,因此我們無法解決問題(基於我從IRC收到的建議),如pause
和resume
。
我也把一個可運行的版本(相當草率),你可以從GitHub抓取:https://github.com/zamnuts/multipipe-proto(GIT克隆,NPM安裝,查看自述,NPM開始)
我也有趣如何搶佔並停止對效率的巨大(多千兆字節)的數據流。 (例如,您只想讀取標題) – user949300 2015-03-11 21:24:03
據我所知,沒有官方解決方案來清除寫入流。我能想到的唯一解決方案是編寫一個自定義轉換流,您可以在管道中的寫入流之前插入它。這個流將實現它自己的緩衝行爲,接管來自寫入流的責任。因爲我們擁有這個流的緩衝機制,所以我們可以構建一個方法來清除它,而不是訴諸黑客。寫入流應該得到一個非常低的highWaterMark,以便在我們終止時最小化要寫入的數據。 – 2015-03-17 19:47:45
@JasperWoudenberg我認爲你在那裏。此外,自從編寫這個問題以來,IIRC就已經有解決這個緩衝怪癖的發佈。 – zamnuts 2015-03-18 18:08:20