2015-11-04 96 views
0

這是我的情景:如何將寫入流1的內容流式傳輸到流2?

producer.WriteStream(stream); 
consumer.ReadStream(stream); 

我想要的東西,允許由producer生成的字節將逐步轉移到consumer

我可以寫一切到MemoryStream,然後倒帶它並在consumer上讀取它,但這會導致巨大的內存消耗。

我該如何做到這一點?

+0

使用2實例(https://msdn.microsoft.com/en-us/library/system.io.pipes.pipestream(V = vs.110)的.aspx),1至讀(客戶端)和1寫(服務器)。 – Amit

+0

謝謝@Amit,你能否詳細說明如何將這些流「綁定」在一起..這對我來說並不清楚。 –

+0

如果您需要將數據從一個數據流傳輸到另一個數據流,通常通過從數據源讀取數據塊(例如1K或4K)並將數據放入目標,直到源數據流爲空。 – Oliver

回答

2

使用管道作爲數據的底層傳輸,可以有一個「寫入流」(服務器)和一個允許這種通信機制的「讀取流」(客戶端)。

使用匿名管道或命名管道(如果需要進程間通信)很簡單。要創建管道流:

AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream(); 
AnonymousPipeClientStream pipeClient = 
    new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString()); 

現在,您可以用這些來寫&讀:

producer.WriteStream(pipeServer); 
// somewhere else... 
consumer.ReadStream(pipeClient); 
+0

這比我的解決方案容易得多。 –

+0

工程就像一個魅力 –

1

我只是把這個共同的樂趣,這是未經測試,可能有一些錯誤。您只需將ReaderStream傳遞給讀者,並將WriterStream傳遞給作者。的[PipeStream]

public class LoopbackStream 
{ 
    public Stream ReaderStream { get; } 
    public Stream WriterStream { get;} 

    private readonly BlockingCollection<byte[]> _buffer; 

    public LoopbackStream() 
    { 
     _buffer = new BlockingCollection<byte[]>(); 
     ReaderStream = new ReaderStreamInternal(_buffer); 
     WriterStream = new WriterStreamInternal(_buffer); 
    } 

    private class WriterStreamInternal : Stream 
    { 
     private readonly BlockingCollection<byte[]> _buffer; 

     public WriterStreamInternal(BlockingCollection<byte[]> buffer) 
     { 
      _buffer = buffer; 
      CanRead = false; 
      CanWrite = false; 
      CanSeek = false; 
     } 

     public override void Close() 
     { 
      _buffer.CompleteAdding(); 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      var newData = new byte[count]; 
      Array.Copy(buffer, offset, newData, 0, count); 
      _buffer.Add(newData); 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead { get; } 
     public override bool CanSeek { get; } 
     public override bool CanWrite { get; } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 
    private class ReaderStreamInternal : Stream 
    { 
     private readonly BlockingCollection<byte[]> _buffer; 
     private readonly IEnumerator<byte[]> _readerEnumerator; 
     private byte[] _currentBuffer; 
     private int _currentBufferIndex = 0; 

     public ReaderStreamInternal(BlockingCollection<byte[]> buffer) 
     { 
      _buffer = buffer; 
      CanRead = true; 
      CanWrite = false; 
      CanSeek = false; 
      _readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator(); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       _readerEnumerator.Dispose(); 
      } 
      base.Dispose(disposing); 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      if (_currentBuffer == null) 
      { 
       bool read = _readerEnumerator.MoveNext(); 
       if (!read) 
        return 0; 
       _currentBuffer = _readerEnumerator.Current; 
      } 

      var remainingBytes = _currentBuffer.Length - _currentBufferIndex; 
      var readBytes = Math.Min(remainingBytes, count); 
      Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes); 
      _currentBufferIndex += readBytes; 

      if (_currentBufferIndex == _currentBuffer.Length) 
       _currentBuffer = null; 

      return readBytes; 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead { get; } 
     public override bool CanSeek { get; } 
     public override bool CanWrite { get; } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 
}