2016-12-26 66 views
0

我有一個來自分叉進程的無限數據流。我希望這個流被一個模塊處理,有時我想複製這個流中的數據以便由不同的模塊處理(例如,監視數據流,但是如果發生任何有趣的事情,我想記錄下n個字節以便爲進一步的調查)。NodeJS流拆分

因此,讓我們假設以下情形:

  1. 我啓動該程序並開始消耗可讀流
  2. 2秒後,我想用不同的流讀取器來處理1秒的相同數據
  3. 一旦時間到了,我想關閉第二個消費者,但最初的消費者必須保持不動。

下面的代碼片段此:

var stream = process.stdout; 

stream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new PassThrough(); 
    stream.pipe(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    stream.unpipe(stream2); 
} 

我在這裏的問題是,unpiping的STREAM2沒有得到它關閉。如果我的unpipe命令後調用stream.end(),然後將其與錯誤崩潰:

events.js:160 
     throw er; // Unhandled 'error' event 
    ^

Error: write after end 
    at writeAfterEnd (_stream_writable.js:192:12) 
    at PassThrough.Writable.write (_stream_writable.js:243:5) 
    at Socket.ondata (_stream_readable.js:555:20) 
    at emitOne (events.js:101:20) 
    at Socket.emit (events.js:188:7) 
    at readableAddChunk (_stream_readable.js:176:18) 
    at Socket.Readable.push (_stream_readable.js:134:10) 
    at Pipe.onread (net.js:548:20) 

我甚至試圖暫停源流,以幫助從第二氣流衝掃的緩衝區,但它沒有工作之一:

function stopAnotherConsumer() { 
    stream.pause(); 
    stream2.once('unpipe', function() { 
     stream.resume(); 
     stream2.end(); 
    }); 
    stream.unpipe(stream2); 
} 

和以前一樣的錯誤(寫完後)。

如何解決問題?我最初的意圖是從一個點複製流數據,然後在一段時間後關閉第二個數據流。

Note: I tried to use this answer to make it work.

回答

0

由於沒有答案,我張貼我的(拼湊)解決方案。如果有人有更好的一個,不要阻止它。

一個新流:

const Writable = require('stream').Writable; 
const Transform = require('stream').Transform; 

class DuplicatorStream extends Transform { 
    constructor(options) { 
     super(options); 

     this.otherStream = null; 
    } 

    attachStream(stream) { 
     if (!stream instanceof Writable) { 
      throw new Error('DuplicatorStream argument is not a writeable stream!'); 
     } 

     if (this.otherStream) { 
      throw new Error('A stream is already attached!'); 
     } 

     this.otherStream = stream; 
     this.emit('attach', stream); 
    } 

    detachStream() { 
     if (!this.otherStream) { 
      throw new Error('No stream to detach!'); 
     } 

     let stream = this.otherStream; 
     this.otherStream = null; 
     this.emit('detach', stream); 
    } 

    _transform(chunk, encoding, callback) { 
     if (this.otherStream) { 
      this.otherStream.write(chunk); 
     } 

     callback(null, chunk); 
    } 
} 

module.exports = DuplicatorStream; 

和使用:

var stream = process.stdout; 
var stream2; 

duplicatorStream = new DuplicatorStream(); 
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
duplicatorStream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new stream.PassThrough(); 
    duplicatorStream.attachStream(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    duplicatorStream.once('detach', function() { 
     stream2.end(); 
    }); 
    duplicatorStream.detachStream(); 
}