2011-12-24 158 views
1

我在C#中創建了一個網絡庫,我可以在任何應用程序中使用它,並且作爲此庫的一部分,我有一個TCP客戶端/服務器設置。這種設置在幾乎所有情況下都可以完美運行它在最小和中等壓力負載下連接,發送/接收數據,並完美斷開連接。但是,當我從客戶端向服務器發送大量數據時,客戶端套接字的工作時間不同(有時很短,有時很長),然後拒絕發送數據一段時間。具體來說,我的數據速率從550-750 KBps範圍變爲0 KBps,並且在那裏再次有不同的時間。然後,套接字將在很短的時間內再次開始發送,並再次進行「限制」。在節流,我是假設該插座被斷開,因爲我無法發送任何東西,但查詢返回在插槽中使用此代碼連接:套接字在一段時間後受到限制


public bool IsConnected(Socket socket) 
{ 
    try 
    { 
     return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0); 
    } 
    catch (SocketException) { return false; } 
} 

我只花了聯網類在我的大學,所以我開始考慮TCP中的擁塞控制和流量控制機制,但在我看來,這兩者都不會導致這個問題。擁塞控制只會降低數據速率,而接收端的完整緩衝區不會持續接近我獲得0 KBps數據速率的時間長度。症狀似乎指向某種類型的重型數據節流或大規模丟包。

我的問題是這樣的:沒有人知道什麼可能導致這個數據「節流」,因爲缺乏更好的術語?另外,我發送的數據包是否可能比我的路由器更遠,即使它們發往同一子網中的主機?

編輯:就這樣很明顯,我試圖解決這個問題的原因是因爲我想通過TCP以儘可能高的數據速率發送文件。我知道UDP也可以使用,而且我也將使用它,但我希望TCP首先工作。

的具體信息:

我使用的是阻塞讀/寫操作,而服務器是多線程的。客戶端也在其自己的線程上運行。我正在測試我的本地子網,通過我的路由器彈跳所有數據包,這應該有54 Mbps的吞吐量。數據包的大小爲8KB,最多每秒發送1000次(發送線程休眠1ms),但顯然未達到該速率。減小數據包的大小,使數據速率較低,導致節流消失。 Windows 7機器,1臺服務器,1臺客戶機。發送操作總是完成,出錯的接收操作。

,發送操作如下:


//get a copy of all the packets currently in the queue 
        IPacket[] toSend; 
        lock (packetQueues[c]) 
        { 
         if (packetQueues[c].Count > SEND_MAX) 
         { 
          toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray(); 
          packetQueues[c].RemoveRange(0, SEND_MAX); 
         } 
         else 
         { 
          toSend = packetQueues[c].ToArray(); 
          packetQueues[c].RemoveRange(0, toSend.Length); 
         } 
        } 
        if (toSend != null && toSend.Length > 0) 
        { //write the packets to the network stream 
         try 
         { 
          writer.Write(toSend.Length); 
         } 
         catch (Exception e) 
         { 
          Logger.Log(e); 
          if (showErrorMessages) 
           MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK); 
         } 
         for (int i = 0; i < toSend.Length; i++) 
         { 
          try 
          { 
           toSend[i].Write(writer); 
           if (onSend != null) 
           { 
            object[] args = new object[2]; 
            args[0] = c; 
            args[1] = toSend[i]; 
            onSend(args); 
           } 
          } 
          catch (Exception e) 
          { 
           Logger.Log(e); 
           if (showErrorMessages) 
            MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK); 
          } 
         } 
        } 

這是接收代碼:


try 
        { 
         //default buffer size of a TcpClient is 8192 bytes, or 2048 characters 
         if (client.Available > 0) 
         { 
          int numPackets = reader.ReadInt32(); 
          for (int i = 0; i < numPackets; i++) 
          { 
           readPacket.Clear(); 
           readPacket.Read(reader); 
           if (owner != null) 
           { 
            owner.AcceptPacket(readPacket, c); //application handles null packets itself. 
            if (onReceive != null) 
            { 
             object[] args = new object[2]; 
             args[0] = c; 
             args[1] = readPacket; 
             onReceive(args); 
            } 
           } 
          } 
          timestamps[c] = TimeManager.GetCurrentMilliseconds(); 
         } 
         else 
         { 
          double now = TimeManager.GetCurrentMilliseconds(); 
          if (now - timestamps[c] >= timeToDisconnect) 
          { //if timestamp is old enough, check for connection. 
           connected[c] = IsConnected(client.Client); 
           if (!connected[c]) 
           { 
            netStream.Close(); 
            clients[c].Close(); 
            numConnections--; 
            if (onTimeout != null) onTimeout(c); 
           } 
           else 
           { 
            timestamps[c] = now; 
           } 
          } 
         } 

        } 
        catch (Exception s) 
        { 
         Logger.Log(s); 
         if (showErrorMessages) 
          MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK); 
        } 

包發送/接收:


public void Write(BinaryWriter w) 
     { 
      w.Write(command); //byte 
      w.Write(data.Type); //short 
      w.Write(data.Data.Length); //int 
      w.Write(data.Data); //byte array 
      w.Flush(); 
     } 

     /// <summary> 
     /// Reads a command packet from data off a network stream. 
     /// </summary> 
     /// <param name="r">The stream reader.</param> 
     public void Read(BinaryReader r) 
     { 
      command = r.ReadByte(); 
      short dataType = r.ReadInt16(); 
      int dataSize = r.ReadInt32(); 
      byte[] bytes = r.ReadBytes(dataSize); 
      data = new PortableObject(dataType, bytes); 
     } 

完整的服務器通信循環:


public void Communicate(object cl) 
     { 
      int c = (int)cl; 
      timestamps[c] = TimeManager.GetCurrentMilliseconds(); 
      try 
      { 
       //Console.Out.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " has started up. c = " + (int)c); 

       TcpClient client = clients[c]; 
       client.ReceiveTimeout = 100; 

       NetworkStream netStream = client.GetStream(); 
       BinaryReader reader = new BinaryReader(netStream); 
       BinaryWriter writer = new BinaryWriter(netStream); 

       while (client != null && connected[c]) 
       { 
        #region Receive 
        try 
        { 
         //default buffer size of a TcpClient is 8192 bytes, or 2048 characters 
         if (client.Available > 0) 
         { 
          int numPackets = reader.ReadInt32(); 
          for (int i = 0; i < numPackets; i++) 
          { 
           readPacket.Clear(); 
           readPacket.Read(reader); 
           if (owner != null) 
           { 
            owner.AcceptPacket(readPacket, c); //application handles null packets itself. 
            if (onReceive != null) 
            { 
             object[] args = new object[2]; 
             args[0] = c; 
             args[1] = readPacket; 
             onReceive(args); 
            } 
           } 
          } 
          timestamps[c] = TimeManager.GetCurrentMilliseconds(); 
         } 
         else 
         { 
          double now = TimeManager.GetCurrentMilliseconds(); 
          if (now - timestamps[c] >= timeToDisconnect) 
          { //if timestamp is old enough, check for connection. 
           connected[c] = IsConnected(client.Client); 
           if (!connected[c]) 
           { 
            netStream.Close(); 
            clients[c].Close(); 
            numConnections--; 
            if (onTimeout != null) onTimeout(c); 
           } 
           else 
           { 
            timestamps[c] = now; 
           } 
          } 
         } 

        } 
        catch (Exception s) 
        { 
         Logger.Log(s); 
         if (showErrorMessages) 
          MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK); 
        } 
        #endregion 

        Thread.Sleep(threadLatency); 

        #region Send 
        //get a copy of all the packets currently in the queue 
        IPacket[] toSend; 
        lock (packetQueues[c]) 
        { 
         if (packetQueues[c].Count > SEND_MAX) 
         { 
          toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray(); 
          packetQueues[c].RemoveRange(0, SEND_MAX); 
         } 
         else 
         { 
          toSend = packetQueues[c].ToArray(); 
          packetQueues[c].RemoveRange(0, toSend.Length); 
         } 
        } 
        if (toSend != null && toSend.Length > 0) 
        { //write the packets to the network stream 
         try 
         { 
          writer.Write(toSend.Length); 
         } 
         catch (Exception e) 
         { 
          Logger.Log(e); 
          if (showErrorMessages) 
           MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK); 
         } 
         for (int i = 0; i < toSend.Length; i++) 
         { 
          try 
          { 
           toSend[i].Write(writer); 
           if (onSend != null) 
           { 
            object[] args = new object[2]; 
            args[0] = c; 
            args[1] = toSend[i]; 
            onSend(args); 
           } 
          } 
          catch (Exception e) 
          { 
           Logger.Log(e); 
           if (showErrorMessages) 
            MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK); 
          } 
         } 
        } 
        #endregion 
       } 
      } 
      catch (ThreadAbortException tae) 
      { 
       Logger.Log(tae); 
       MessageBox.Show("Thread " + (int)cl + " was aborted.", "Error", MessageBoxButtons.OK); 
      } 
     } 
+0

儘管我看到了很多代碼,但沒有什麼用於調試您的問題。 BTW,用TCP發送和接收數據包並不難。您不應該擔心網絡速度,MTU等。 – 2011-12-24 20:19:00

+0

@ L.B正如我在問題中所說的,我的解決方案在大多數情況下都能正常工作。只有在壓力很大的情況下,我遇到了我所描述的問題,並且TCP中的發送/接收很容易,這正是我尋找MTU等其他問題的原因。我提供的代碼僅僅是爲了防止任何人想看我是如何發送/接收和檢查連接。 – Darkhydro 2011-12-24 20:31:57

+1

我只能說,這很可能是您項目中的錯誤/錯誤代碼。玩TCP優化不是真正的方法。否則,只有網絡專家才能夠編寫基於TCP的代碼 – 2011-12-24 20:38:55

回答

2

這可能是你的代碼,但我們很難說,因爲它是不完整的。

我在.NET TCP/IP FAQ中寫下了自己的一套最佳實踐 - 經過多年的TCP/IP經驗。我建議你從此開始。

P.S.我在線上保留數據包的術語「數據包」。 TCP應用程序無法控制數據包。我使用術語「消息」來表示應用程序協議級別的消息。我認爲這可以減少混亂,特別是對於新來者。

+0

如果您有時間查看,我已經添加了完整的服務器通信循環。客戶端的通信環路非常相似,但一次只支持一個客戶端。除此之外,確實沒有太多的代碼。 – Darkhydro 2011-12-25 00:39:35

+0

看完你的一篇文章之後,看來我的閱讀/寫作方法從一開始就完全錯了。 BinaryReader的接收方法是否可以只讀取流中的部分數據?它不會返回多少被讀取的,只有被讀取的,所以我無法知道部分接收 – Darkhydro 2011-12-25 01:03:43

0

不真的瞭解C#並且此代碼不完整。如果我找對了,那麼

readPacket.Read(reader); 

會讀取任何可用的內容,並且您的接收器結束for循環將被打翻。你在哪裏檢查讀取的字節數量?

總之,要檢查所發生的事情在TCP級別的好方法下是wireshark

+0

我添加了用於讀寫CommandPacket類的代碼,該代碼派生自IPacket,並且是我用作readPacket變量的代碼。我會使用wireshark,除了在發生錯誤之前我發送的數據包的數量是幾千個。 – Darkhydro 2011-12-24 20:58:07

1

如果您要創建在C#中

網絡庫,我可以在任何應用程序

使用你意識到任何現有的開源庫,在那裏的? networkComms.net可能是一個好的開始。如果你可以重新創建相同的問題,我會很驚訝。我親自用它來維護超過1000個併發連接,每個連接每秒發送大約10個數據包。否則,如果你想繼續使用你的代碼,或許看看networkComms.net的source可以指出你可能會出錯的地方。

+0

我的意思是任何.NET應用程序,如果這是誤導性的,我很抱歉。 – Darkhydro 2011-12-25 00:33:41

+0

仍然困惑。 networkComms.net是爲在.net應用程序中使用而編寫的。 – MarcF 2011-12-25 09:50:48

+0

我想寫我自己的庫,而不是使用另一個已有的庫。儘管如此,它可能是一個很好的圖書館。 – Darkhydro 2011-12-25 16:51:03

1

沒有仔細看看你的代碼片段,但我看到你在那裏有分配 - 你有沒有檢查你在垃圾回收器上施加的壓力?

PS:(sending thread sleeps 1 ms) - 請記住,沒有timeBeginPeriod()的Sleep()不會獲得您的1ms分辨率 - 可能會接近10-20ms,具體取決於Windows版本和硬件。