2017-07-25 93 views
1

我有一個流可以同時讀取和寫入。 MSDN文檔說,只有到達流的末尾時,我才應該從Read(buffer, offset, count)方法中返回零。實現異步寫入時的讀取

由於讀寫操作是異步的,所以如果內部緩衝區爲空,讀取需要等待另一個寫入。

我正在努力寫作方法如何表示一切都已寫好。我能想到的最好的方法是Dispose()(或Close())方法將標誌着寫作的結束,但這感覺非常錯誤。

我流類實現爲:

public class ContinuousStream : Stream 
{ 
    private readonly IProducerConsumerCollection<byte> _buffer; 

    public ContinuousStream() => _buffer = new ConcurrentQueue<byte>(); 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     var actualBytesRead = 0; 

     for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++) 
     { 
      buffer[i] = b; 
      actualBytesRead++; 
     } 

     return actualBytesRead; 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     for (var i = offset; i < maxByteCount; i++) 
     { 
      _buffer.TryAdd(buffer[i]); 
     } 
    } 
} 

爲了澄清

同時讀取和寫入發生。閱讀不應該等到所有內容在運行之前進行緩衝。我怎樣才能向這個信號流發出將要發生的所有寫作都已經發生?

+0

有許多不同的方法可以解決這個,使你的問題太寬泛。但是,你可以看看'NetworkStream'作爲例子。它包裝了一個'Socket',它有一個指示流結束的機制('Shutdown()')。你可以簡單地在你的類中實現類似的東西,這是寫代碼調用的一種方法,表示它已經完成了寫操作,這樣你就可以知道何時從讀操作中返回0字節。您也可以直接使用NetworkStream(即使用'Socket')或一些非流機制來進行通信(例如'BlockingCollection ')。 –

+0

謝謝@PeterDuniho。實際上我剛剛添加了一個'CloseWrite()'方法,它將表示不會再有任何寫入。起初它感覺錯了,因爲它不能用於需要流的地方。我越想到實際的例子,我認爲這可能是正確的解決方案。 – BanksySan

回答

0

其實,我不太明白爲什麼你需要爲這個流,不能使用IProducerConsumerCollection單獨。許多這些參數都沒有意義。只需使用TryTake & TryAdd而不是讀取&寫在第一位。

但是你可以使用這個的AutoResetEvent

public class ContinuousStream : Stream 
{ 
    private readonly AutoResetEvent _are = new AutoResetEvent(false); 
    private readonly IProducerConsumerCollection<byte> _buffer = new ConcurrentQueue<byte>(); 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     _are.WaitOne(); 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     var actualBytesRead = 0; 

     for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++) 
     { 
      buffer[i] = b; 
      actualBytesRead++; 
     } 

     return actualBytesRead; 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     var maxByteCount = offset + count > buffer.Length ? buffer.Length : count; 
     for (var i = offset; i < maxByteCount; i++) 
     { 
      _buffer.TryAdd(buffer[i]); 
      _are.Set(); 
     } 
    } 
} 
+0

這是一個流,以便它可以用於接受流的地方。你說得對,它只是一個Queue的包裝。 – BanksySan

+0

看看你有什麼,它會需要所有的寫作之前完成任何閱讀可以發生? – BanksySan

+0

什麼是offset BanksySan