0
我有一個來自分叉進程的無限數據流。我希望這個流被一個模塊處理,有時我想複製這個流中的數據以便由不同的模塊處理(例如,監視數據流,但是如果發生任何有趣的事情,我想記錄下n個字節以便爲進一步的調查)。NodeJS流拆分
因此,讓我們假設以下情形:
- 我啓動該程序並開始消耗可讀流
- 2秒後,我想用不同的流讀取器來處理1秒的相同數據
- 一旦時間到了,我想關閉第二個消費者,但最初的消費者必須保持不動。
下面的代碼片段此:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
我在這裏的問題是,unpiping的STREAM2沒有得到它關閉。如果我的unpipe
命令後調用stream.end()
,然後將其與錯誤崩潰:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
我甚至試圖暫停源流,以幫助從第二氣流衝掃的緩衝區,但它沒有工作之一:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function() {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
和以前一樣的錯誤(寫完後)。
如何解決問題?我最初的意圖是從一個點複製流數據,然後在一段時間後關閉第二個數據流。
Note: I tried to use this answer to make it work.