2017-07-31 86 views
0

我最近一直在爲項目使用SocketAsyncEventArgs,並且遇到了ReceiveAsync偶爾以與通過SendAsync發送的內容不同的順序獲取數據的問題。在SendAsync方法中發送的每個數據塊都保留,但塊不一定按正確的順序排列。也許我對SendAsync方法有不正確的理解,但我認爲特別是使用SocketType.Stream和ProtocolType.Tcp將確保順序得到維護。我知道底層進程將不可避免地破壞消息,並且ReceiveAsync通常會讀取少於緩衝區分配的內容。但我認爲發送和接收流將維持秩序。SocketAsyncEventArgs發送/接收訂單

我製作了一個顯示問題的測試控制檯程序。它每次使用不同的套接字和端口集嘗試運行約20次。在我的筆記本電腦上,它通常會通過一次,然後再次失敗;通常在期待第二個時候會收到一個後面的塊。從其他測試中,我知道預期的塊最終會出現,只是順序不對。

一個警告是我能夠在Windows 2008遠程服務器上測試它,並且沒有問題。但是,它從來沒有接近完成我的筆記本電腦。事實上,如果我讓調試執行在異常暫停中暫停一段時間,我已經不止一次地完全凍結了我的筆記本電腦,並且必須重新啓動硬盤。這是我使用VS2017在Windows 7上運行的筆記本電腦。我不確定它是否可能是一個因素,但它正在運行Symantec Endpoint Protection,儘管我沒有在日誌中找到任何內容。

所以我的問題是,我有一個SocketAsyncEventArgs如何操作的錯誤觀點?或者,我的代碼是一場災難(也許都是)?這對我的筆記本電腦來說有點獨特嗎? (這最後一個讓我覺得我設置了尷尬的時候你是新來的編程一樣,你覺得一定有什麼錯誤的編譯器。)

using System; 
using System.IO; 
using System.Linq; 
using System.Net; 
using System.Net.Sockets; 
using System.Threading.Tasks; 

static class DumTest 
{ 

    static void Main(string[] args) 
    { 
     for (int i = 9177; i < 9199; i++) 
     { 
      RunDum(i); 
      //Thread.Sleep(350); 
     } 

     Console.WriteLine("all done."); 
     Console.ReadLine(); 
    } 

    static void RunDum(int port) 
    { 
     var dr = new DumReceiver(port); 
     var ds = new DumSender(port); 

     dr.Acception.Wait(); 

     ds.Connection.Wait(); 

     dr.Completion.Wait(); 

     ds.Completion.Wait(); 

     Console.WriteLine($"Completed {port}. " + 
      $"sent: {ds.SegmentsSent} segments, received: {dr.SegmentsRead} segments"); 
    } 
} 

class DumReceiver 
{ 
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs(); 
    private readonly TaskCompletionSource<object> tcsAcc = new TaskCompletionSource<object>(); 

    private TaskCompletionSource<object> tcsRcv; 
    private Socket socket; 

    internal DumReceiver(int port) 
    { 
     this.eva.Completed += this.Received; 

     var lstSock = new Socket(
      AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 

     var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList 
      .First(i => i.AddressFamily == AddressFamily.InterNetwork); 

     lstSock.Bind(new IPEndPoint(localIP, port)); 
     lstSock.Listen(1); 

     var saea = new SocketAsyncEventArgs(); 
     saea.Completed += this.AcceptCompleted; 
     lstSock.AcceptAsync(saea); 
    } 

    internal Task Acception => this.tcsAcc.Task; 

    internal Task Completion { get; private set; } 

    internal int SegmentsRead { get; private set; } 

    private void AcceptCompleted(object sender, SocketAsyncEventArgs e) 
    { 
     if (e.SocketError == SocketError.Success) 
     { 
      this.socket = e.AcceptSocket; 
      e.Dispose(); 
      try 
      { 
       this.Completion = this.ReceiveLupeAsync(); 
      } 
      finally 
      { 
       this.tcsAcc.SetResult(null); 
      } 
     } 
     else 
     { 
      this.tcsAcc.SetException(new SocketException((int)e.SocketError)); 
     } 
    } 

    private async Task ReceiveLupeAsync() 
    { 
     var buf = new byte[8196]; 
     byte bufSeg = 1; 
     int pos = 0; 

     while (true) 
     { 
      this.tcsRcv = new TaskCompletionSource<object>(); 
      this.eva.SetBuffer(buf, pos, 8196 - pos); 
      if (this.socket.ReceiveAsync(this.eva)) 
      { 
       await this.tcsRcv.Task.ConfigureAwait(false); 
      } 

      if (this.eva.SocketError != SocketError.Success) 
      { 
       throw new SocketException((int)eva.SocketError); 
      } 

      if (this.eva.BytesTransferred == 0) 
      { 
       if (pos != 0) 
       { 
        throw new EndOfStreamException(); 
       } 

       break; 
      } 

      pos += this.eva.BytesTransferred; 
      if (pos == 8196) 
      { 
       pos = 0; 
       for (int i = 0; i < 8196; i++) 
       { 
        if (buf[i] != bufSeg) 
        { 
         var msg = $"Expected {bufSeg} but read {buf[i]} ({i} of 8196). " + 
          $"Last read: {this.eva.BytesTransferred}."; 
         Console.WriteLine(msg); 
         throw new Exception(msg); 
        } 
       } 

       this.SegmentsRead++; 
       bufSeg = (byte)(this.SegmentsRead + 1); 
      } 
     } 
    } 

    private void Received(object s, SocketAsyncEventArgs e) => this.tcsRcv.SetResult(null); 
} 

class DumSender 
{ 
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs(); 
    private readonly Socket socket = new Socket(
     AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 

    private readonly TaskCompletionSource<object> tcsCon = new TaskCompletionSource<object>(); 
    private TaskCompletionSource<object> tcsSnd; 

    internal DumSender(int port) 
    { 
     this.eva.Completed += this.Sent; 

     var saea = new SocketAsyncEventArgs(); 
     var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList 
      .First(i => i.AddressFamily == AddressFamily.InterNetwork); 

     saea.RemoteEndPoint = new IPEndPoint(localIP, port); 
     saea.Completed += this.ConnectionCompleted; 
     this.socket.ConnectAsync(saea); 
    } 

    internal Task Connection => this.tcsCon.Task; 

    internal Task Completion { get; private set; } 

    internal int SegmentsSent { get; private set; } 

    private void ConnectionCompleted(object sender, SocketAsyncEventArgs e) 
    { 
     if (e.SocketError == SocketError.Success) 
     { 
      e.Dispose(); 

      try 
      { 
       this.Completion = this.SendLupeAsync(); 
      } 
      finally 
      { 
       this.tcsCon.SetResult(null); 
      } 
     } 
     else 
     { 
      this.tcsCon.SetException(new SocketException((int)e.SocketError)); 
     } 
    } 

    private async Task SendLupeAsync() 
    { 
     var buf = new byte[8196]; 
     byte bufSeg = 1; 

     while (true) 
     { 
      for (int i = 0; i < 8196; i++) 
      { 
       buf[i] = bufSeg; 
      } 

      this.tcsSnd = new TaskCompletionSource<object>(); 
      this.eva.SetBuffer(buf, 0, 8196); 
      if (this.socket.SendAsync(this.eva)) 
      { 
       await this.tcsSnd.Task.ConfigureAwait(false); 
      } 

      if (this.eva.SocketError != SocketError.Success) 
      { 
       throw new SocketException((int)this.eva.SocketError); 
      } 

      if (this.eva.BytesTransferred != 8196) 
      { 
       throw new SocketException(); 
      } 

      if (++this.SegmentsSent == 299) 
      { 
       break; 
      } 

      bufSeg = (byte)(this.SegmentsSent + 1); 
     } 

     this.socket.Shutdown(SocketShutdown.Both); 
    } 

    private void Sent(object s, SocketAsyncEventArgs e) => this.tcsSnd.SetResult(null); 
} 
+1

我無法使用您發佈的代碼重現您的問題(Windows 10 Pro)。代碼對我來說看起來沒問題。除非我需要最大的可伸縮性,否則我不會在Socket中使用'XXXAsync()'方法。使用'NetworkStream'和傳統的'await'友好的'XXXAsync()'方法,代碼更容易讀寫。但是我沒有在代碼中看到任何實際的_bug_,並且對於套接字中可用的任何異步機制,存在潛在的緩衝區重新排序問題,只有當您有兩個或多個併發排隊的讀取操作時纔會出現你沒有。 –

+0

FWIW,我增加了測試代碼,以運行22個端口測試的20次迭代,其中2990個段而不是299個,並且仍然無法使其失敗。坦率地說,如果你有任何類型的AV軟件,那麼如果你開始看到來自I/O代碼的意外行爲,那應該總是你禁用的第一件事。這類軟件中的錯誤太常見了,而且在向更大的社區尋求幫助之前,不妨試試這種可能性,以便有任何藉口不會這樣做。 –

+0

我很感激你花時間來測試它,並道歉,如果這只是浪費時間。我天真地認爲,因爲連接是從本地主機進行的,所以它不在AV的範圍內。我嘗試關閉它,然後用相同的結果測試它。這樣的公司設置並沒有讓它變得非常直觀或容易,所以它可能還在運行。我明天會有更多的選擇來測試它。感謝您的理智檢查。 – alex98101

回答

0

我相信這個問題是在你的碼。

您必須檢查返回的Socket*Async方法使用SocketAsyncEventArgs。如果它們返回false,則它們不會引發SocketAsyncEventArgs.Completed事件,您必須同步處理結果。

參考文檔:SocketAsyncEventArgs Class。搜索willRaiseEvent

DumReceiver的構造函數中,您不檢查AcceptAsync的結果,並且在同步完成時不處理大小寫。

DumSender的構造函數中,不檢查ConnectAsync的結果,並且在同步完成時不處理大小寫。

最重要的是,SocketAsyncEventArgs.Completed事件可能會在其他某個線程中引發,最有可能是ThreadPool的I/O線程。

每次分配給DumReceiver.tcsRcvDumSender.tcsSnd沒有適當的同步時間,你不能確保DumReceiver.ReceivedDumSender.Sent正在使用最新的TaskCompletionSource

實際上,您可以在第一次迭代中獲得NullReferenceException

你缺乏同步:

  • DumReceiver,字段tcsRcvsocket和性能CompletionSegmentsRead

  • DumSender,現場tcsSnd和性能CompletionSegmentsSent

我建議您考慮在每次調用ReceiveAsyncSendAsync時使用單個SemaphoreSlim而不是創建新的TaskCompletionSource。您將在構造函數中將信號量初始化爲0。如果*Async操作處於待處理狀態,則您將在信號量上爲await WaitAsync,而Completed事件爲Release信號量。

這應該足以擺脫TaskCompletionSource字段中的競爭條件。您仍然需要在其他字段和屬性上進行適當的同步。例如,沒有理由爲什麼Completion不能在構造函數中創建,並且SegmentsReadSegmentsSent可能是隻讀的,並且指的是可以用一個或多個Interlocked方法內部訪問的字段(例如Interlocked.IncrementInterlocked.Add) 。

+0

給出的示例代碼只是爲了展示我正在努力的問題。如果代碼被用於超出範圍的任何事情,我當然會更加在意如何處理AcceptAsync和ConnectAsync。我描述的問題顯然與處理成功的聽/連接無關。您的同步斷言並不完全正確,因爲發送方或接收方都沒有同時運行操作。也就是說,直到前一個完成後才能開始接收。我認爲彼得在原評論中做了很好的幫助我隔離真正的問題。 – alex98101

+0

那麼,真正的問題是什麼? – acelent

+0

可能是我工作筆記本電腦上的防病毒軟件。可能還有別的東西,但仍然是我的機器特有的。重要的部分是在任何其他機器上測試時問題都消失了。 – alex98101