2017-08-02 122 views
4

編輯:我沒有最終根據discussionStephen Cleary做這種方法。如果你對我的做法感興趣,請看下面我的answer這種方法比僅僅在Task.Run中觸發stream.Read()更好嗎?

我正在尋找一種方法來從NetworkStream超時異步讀取。當然,問題在於無法取消NetworkStream上的ReadAsync(),因爲它只是忽略了CancellationToken。我讀了一個答案,建議關閉Token取消流,但在我的情況下,這不是一個選項,因爲Tcp連接必須保持打開狀態。所以我想出了下面的代碼,但我想知道這是否比做一個

Task.Run(() => stream.Read(buffer, offset, count) 

只是阻止一個線程。

public static class TcpStreamExtension 
{ 
    public static async Task<int> ReadAsyncWithTimeout(this NetworkStream stream, byte[] buffer, int offset, int count) 
    { 
     CancellationTokenSource cts = new CancellationTokenSource(); 
     bool keepTrying = true; 
     Timer timer = new Timer(stream.ReadTimeout); 
     timer.Elapsed += new ElapsedEventHandler((sender, args) => stopTrying(sender, args, cts, out keepTrying)); 
     timer.Start(); 

     try 
     { 
      if (stream.CanRead) 
      { 
       while (true) 
       { 
        if (stream.DataAvailable) 
        { 
         return await stream.ReadAsync(buffer, offset, count, cts.Token).ConfigureAwait(false); 
        } 

        if (keepTrying) 
        { 
         await Task.Delay(300, cts.Token).ConfigureAwait(false); 
        } 
        else 
        { 
         cts.Dispose(); 
         timer.Dispose(); 
         throw new IOException(); 
        } 
       } 
      } 
     } 
     catch (TaskCanceledException tce) 
     { 
      // do nothing 
     } 
     finally 
     { 
      cts.Dispose(); 
      timer.Dispose(); 
     } 
     if (stream.DataAvailable) 
     { 
      return await stream.ReadAsync(buffer, offset, count).ConfigureAwait(false); 
     } 

     throw new IOException(); 
    } 

    private static void stopTrying(object sender, ElapsedEventArgs args, CancellationTokenSource cts, out bool keepTrying) 
    { 
     keepTrying = false; 
     cts.Cancel(); 
    } 

} 

的應用具有潛在的能夠與幾千個端點進行通信和我想要的方式,也不會封鎖一堆線程,因爲大多數它的工作是IO來創建它。另外,超時的情況應該是

+3

您可以在不關閉TCP連接的情況下關閉NetworkStream,如果您從TCPClient獲取流但不使用它,請獲取套接字並手動創建具有以下超載的流:'public NetworkStream(Socket socket,\t bool ownsSocket)'with 'ownSocket = false',這樣就可以使TCP連接保持打開狀態,並且可以根據需要創建儘可能多的數據流。 – Gusman

+0

看來,如果stream.ReadAsync引發異常,cts和timer將不會被處置。 –

+0

備註:確保在取消讀取操作後可靠地找到下一條消息的開頭,因爲您不知道蒸汽的當前狀態。 –

回答

0

基礎上discussionStephen Cleary和他的建議,我把在我如何實現這個第二的外觀和我不每讀取超時的方法去,但它保持,只要打開作爲TcpClient的是開放的,然後從不同的代碼控制超時。我用Task.Run(() => beginReading());所以當然會在使用線程池,但我認爲它是好的,因爲大多數的線程會被擊中await,因此可以自由

這裏的時間是我實現:

private readonly Queue<byte> bigBuffer = new Queue<byte>(); 
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1); 

// This is called in a Task.Run() 
private async Task beginReading() 
{ 
    byte[] buffer = new byte[1024]; 

    using (_shutdownToken.Register(() => m_TcpStream.Close())) 
    { 
     while (!_shutdownToken.IsCancellationRequested) 
     { 
      try 
      { 
       int bytesReceived = 0; 
       if (m_TcpStream.CanRead) 
       { 
        bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); 
       } 
       else 
       { 
        // in case the stream is not working, wait a little bit 
        await Task.Delay(3000, _shutdownToken); 
       } 

       if (bytesReceived > 0) 
       { 
        for (int i = 0; i < bytesReceived; i++) 
        { 
         bigBuffer.Enqueue(buffer[i]); 
        } 

        _signal.Release(); 
        Array.Clear(buffer, 0, buffer.Length); 

       } 
      } 
      catch (Exception e) 
      { 
       LoggingService.Log(e); 
      } 
     } 
    } 
} 

private async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count) 
{ 
    int bytesToBeRead = 0; 

    if (!m_TcpClient.Connected) 
    { 
     throw new ObjectDisposedException("Socket is not connected"); 
    } 

    if (bigBuffer.Count > 0) 
    { 
     bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count; 

     for (int i = offset; i < bytesToBeRead; i++) 
     { 
      buffer[i] = bigBuffer.Dequeue(); 
     } 

     // Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting 
     if (_signal.CurrentCount > 0) 
      await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false); 

     return bytesToBeRead; 
    } 

    // In case there is nothing in the Q, wait up to timeout to get data from the writer 
    await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false); 

    // read again in case the semaphore was signaled by an Enqueue 
    if (bigBuffer.Count > 0) 
    { 
     bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count; 

     for (int i = offset; i < bytesToBeRead; i++) 
     { 
      buffer[i] = bigBuffer.Dequeue(); 
     } 


     return bytesToBeRead; 
    } 

    // This is because the synchronous NetworkStream Read() method throws this exception when it times out 
    throw new IOException(); 
} 
0

對於類似的用例,我使用了一個Task.Delay()任務超時。 是這樣的:

public static async Task<int> ReadAsync(
     NetworkStream stream, byte[] buffer, int offset, int count, int timeoutMillis) 
{ 
     if (timeoutMillis < 0) throw new ArgumentException(nameof(timeoutMillis)); 
     else if (timeoutMillis == 0) 
     { 
      // No timeout 
      return await stream.ReadAsync(buffer, offset, count); 
     } 

     var cts = new CancellationTokenSource(); 
     var readTask = stream.ReadAsync(buffer, offset, count, cts.Token); 
     var timerTask = Task.Delay(timeoutMillis, cts.Token); 

     var finishedTask = await Task.WhenAny(readTask, timerTask); 
     var hasTimeout = ReferenceEquals(timerTask, finishedTask); 
     // Cancel the timer which might be still running 
     cts.Cancel(); 
     cts.Dispose(); 

     if (hasTimeout) throw new TimeoutException(); 
     // No timeout occured 
     return readTask.Result; 
} 
+0

我最初採用相同的方法,但後來我發現有些消息會丟失。我認爲發生了什麼是readAsync調用將保持打開並泄漏消息。 –

+0

一旦進入第一次超時並取消讀取,您基本上不再保證流的狀態。可能已經有一些數據已經被使用,或者可能沒有。您可以做的唯一事情就是在該層發生任何超時後關閉網絡流。 – Matthias247

+0

其實我試過了Gusman在問題的評論中提出的建議,但似乎也沒有效果。在沒有客戶端的情況下關閉流也會導致數據丟失。 –

3

首先,你正在試圖做的是根本性的缺陷。您應該始終從開放的TCP/IP流中讀取 - 只要一次讀取某些數據,就將其傳遞並開始下一次讀取。

所以,我的第一個建議是不是需要首先取消可讀的閱讀。相反,始終保持閱讀。同樣,使用DataAvailable是一種代碼異味。

更多的解釋...

沒有「強制執行」消除對非取消代碼的好方法。關閉TCP/IP套接字是最簡單和最乾淨的方法。您的現有解決方案將無法工作,因爲ReadAsync忽略了CancellationToken。所以它沒有比使用CancellationToken沒有定時器更好。如果ReadAsync忽略CancellationToken,您唯一真正的選擇是關閉套接字。任何其他解決方案都可能導致「丟失數據」 - 從套接字中讀取但丟棄的數據。

+0

這就是我正在尋找的那種批評。我會嘗試@Gusman在評論中提出的建議,並將報告如何進行。 –

+1

@SvetAngelov:我不確定這是否會奏效。這取決於'NetworkStream'如何從'Socket'中讀取。許多流都有某種緩衝區,如果流關閉並且套接字被重用,這會導致數據丟失。 –

+0

我希望我有辦法解決取消問題,但不幸的是,我們所連接的端點,一個不會改變的遺留設備,希望我們嘗試連接一段時間,如果我們沒有得到答覆,我們要取消,然後等一會再試。我會看看我是否可以做任何事情來解決這個問題 –

相關問題