2016-02-26 125 views
0

我在Boost ASIO UDP的性能上遇到了一些麻煩 - 或者至少 - 我想是這樣。Boost UDP套接字的性能 - 吞吐量低?

我已經把一個簡單的客戶端/服務器應用程序來說明:

  1. 客戶開始大(65K)UDP數據包發送到服務器 - 慢慢
  2. 服務器立即發回一個ACK包當收到一個包裹時
  3. 客戶會密切關注這些ACK軟件包進入的速度 - 並且發送出去的包裹的速度要快一些 - 以便提高速度。

(丟包/重發等,是不是在示例代碼關注的問題。)

當初在寫這在C#中的客戶端迅速達到網卡的極限 - 大約125兆字節/秒。當使用Boost重寫相同的代碼時,速度停止在2兆字節/秒以下。

enter image description here

CPU爲客戶端和服務器是1-2%。沒有內存問題。在本地主機上運行服務器和客戶端。運行Windows 10,VS 2013,Boost 1.60。

我曾嘗試使用異步發送/接收方法的助推器,但似乎沒有幫助。 How to increase throughput of Boost ASIO, UDP client application

是否有兩個線程發送/接收來自同一個boost套接字的問題? C++ Socket Server - Unable to saturate CPU

PS - 我開始編程的C++ 2個月前 - 所以可能在這裏一個非常愚蠢的錯誤的地方..但我無法看到究竟在何處。任何幫助/想法都會大受歡迎。

客戶:

#pragma once 
#include <boost/smart_ptr/shared_ptr.hpp> 
#include <boost/asio.hpp> 
#include <boost/thread/v2/thread.hpp> 
#include <boost/thread/detail/thread.hpp> 
#include <chrono> 
#include <boost/lexical_cast.hpp> 

#define EXPOSE_IN_DLL __declspec(dllexport) 

namespace dummy{ 

    EXPOSE_IN_DLL class DummyClient 
    { 
     public: 

      // How fast the client is receiving acks - e.g. every 200 ms. 
      double MillisecondsPerAck = 200; 

      //How fast the client should be sending packages (meaning - how long it should sleep between packages) 
      long long MinimumOfMillisecondsToUsePerSentPackage = 200; 

      //How often the code should throttle (calculate new value for 'MinimumOfMillisecondsToUsePerSentPackage') 
      int intervalMilliseconds = 500; 

      //How much faster we should send than receive (how fast we should increase the speed) 
      const double ThrottleAgressiveness = 0.7; 

      //Counters 
      int AcksSinceLastTime = 0; 
      int AcksLastTime = 0; 
      int PackagesSent = 0; 
      int AcksReceived = 0; 
      long long BytesSentAndAcked = 0; 

      //Size of UDP Package to send. IP Layer (NIC) will break larger packages into smaller ones (MTU). 
      static const int PacketSize = 65000; 

      std::shared_ptr<boost::asio::io_service> io_service; 
      std::shared_ptr<boost::asio::ip::udp::socket> socket; 
      std::shared_ptr<boost::asio::ip::udp::endpoint> udpEndPoint; 

      EXPOSE_IN_DLL DummyClient() 
      { 
       boost::thread receiverThread(&DummyClient::SendPackages, this); 
       receiverThread.detach(); 

       using namespace std::chrono; 
       auto started = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); 

       while (true){ 

        boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 

        auto elapsedMilliseconds = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - started; 

        system("cls"); 
        std::cout << "     ############ UDP CLIENT ############ \n\n"; 
        std::cout << "        Packages sent: " << PackagesSent << "\n"; 
        std::cout << "        Acks received: " << AcksReceived << "\n"; 
        std::cout << "       Bytes delivered: " << BytesToSize(BytesSentAndAcked) << "\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 

        if (AcksReceived > 0 && PackagesSent > 0) 
         std::cout << "       Acked percentage: " << 100.0 * AcksReceived/PackagesSent << "\n"; 

        std::cout << "---------------------------------------------------------------------------\n"; 
        std::cout << "         Bandwidth: " << BytesToSize(1000 * BytesSentAndAcked/elapsedMilliseconds) << "/sec\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 
        std::cout << "      MilliSecondsPerAck: " << MillisecondsPerAck << "\n"; 
        std::cout << "      ThrottleAgressiveness: " << ThrottleAgressiveness << "\n"; 
        std::cout << " MinimumOfMillisecondsToUsePerSentPackage: " << MinimumOfMillisecondsToUsePerSentPackage << "\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 
       } 
      } 

      EXPOSE_IN_DLL ~DummyClient(){ 

      }; 

    private: 

     void SendPackages(){ 
      io_service = std::make_shared<boost::asio::io_service>(); 
      socket = std::make_shared<boost::asio::ip::udp::socket>(*io_service); 
      udpEndPoint = std::make_shared<boost::asio::ip::udp::endpoint>(boost::asio::ip::address_v4::from_string("127.0.0.1"), 56000); 
      socket->open(boost::asio::ip::udp::v4()); 

      //Start throtteling thread 
      boost::thread throttleThread(&DummyClient::ThrottleThread, this); 

      //Start Receiver thread - that listens for ACK packages from the Dummy server 
      boost::thread receiverThread(&DummyClient::ReceiverThread, this); 

      //Start sending packages - slowly 
      unsigned char dummyData[PacketSize]; 
      auto bufferToSend = boost::asio::buffer(dummyData, PacketSize); 
      for (int i = 0; i < 100000; i++) 
      { 
       //Send 
       socket->send_to(bufferToSend, *udpEndPoint, boost::asio::socket_base::message_do_not_route); 

       PackagesSent++; 

       //Check if we need to sleep a little (throtteling) before sending next package 
       if (MinimumOfMillisecondsToUsePerSentPackage > 0) 
        boost::this_thread::sleep_for(boost::chrono::milliseconds(MinimumOfMillisecondsToUsePerSentPackage)); 
      } 
     } 

     //"Acks" are received here - we are just counting them to get the rate they're coming in at. 
     void ReceiverThread(){ 

      //Need to wait until first package is sent - so that the local andpoint is bound 
      while (PackagesSent == 0){ 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 
      } 

      //Set up receiving buffer 
      unsigned char receiveBuffer[100]; 
      auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 100); 
      boost::asio::ip::udp::endpoint senderEndPoint; 

      //Start Receiving! 
      while (true){ 
       socket->receive_from(bufferToReceiveInto, senderEndPoint); 
       AcksReceived++; 
       BytesSentAndAcked += PacketSize; 
      } 
     } 


     //Counts acks per interval - and adjusts outgoing speed accordingly. 
     void ThrottleThread() 
     { 
      while (true) 
      { 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(intervalMilliseconds)); 

       //Find out how many packages we got since last time - and send a little bit faster than this number. 
       if (PackagesSent > 0) 
       { 
        AcksSinceLastTime = AcksReceived - AcksLastTime; 
        AcksLastTime = AcksReceived; 

        if (AcksSinceLastTime == 0) 
        { 
         //No new packages got acked - slow the down! 
         MinimumOfMillisecondsToUsePerSentPackage = 200; 
         continue; 
        } 

        //Increase sending speed to a little bit faster than we're receiving packages 
        MillisecondsPerAck = 1.0 * intervalMilliseconds/AcksSinceLastTime; 
        MinimumOfMillisecondsToUsePerSentPackage = MillisecondsPerAck * ThrottleAgressiveness; 
       } 
      } 
     } 


     //Util method 
     std::string BytesToSize(long long Bytes){ 
      float tb = 1099511627776; 
      float gb = 1073741824; 
      float mb = 1048576; 
      float kb = 1024; 

      std::string returnSize = ""; 

      if (Bytes >= tb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/tb); 
       returnSize += " TB"; 
      } 
      else if (Bytes >= gb && Bytes < tb) 
      { 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/gb); 
       returnSize += " GB"; 
      } 
      else if (Bytes >= mb && Bytes < gb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/mb); 
       returnSize += " MB"; 
      } 
      else if (Bytes >= kb && Bytes < mb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/kb); 
       returnSize += " KB"; 
      } 
      else{ 
       returnSize += boost::lexical_cast<std::string>(Bytes); 
       returnSize += " Bytes"; 
      } 
      return returnSize; 
     } 
    }; 
} 

SERVER:

#pragma once 

#include <boost/asio.hpp> 
#include <boost/smart_ptr/shared_ptr.hpp> 
#include <boost/thread/v2/thread.hpp> 
#include <boost/thread/detail/thread.hpp> 

#define EXPOSE_IN_DLL __declspec(dllexport) 

using namespace boost::asio; 

namespace dummy{ 
class DummyServer 
{ 
    public: 

     std::shared_ptr<ip::udp::socket> listenSocket; 
     std::shared_ptr<io_service> listenIoService; 

     int PackagesReceived = 0; 

     EXPOSE_IN_DLL DummyServer() 
     { 
      boost::thread receiverThread(&DummyServer::ReceivePackages, this); 
      receiverThread.detach(); 

      //Print status 
      while (true){ 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 
       system("cls"); 
       std::cout << "  ############ UDP SERVER ############ \n\n"; 
       std::cout << "  Packages received: " << PackagesReceived << "\n"; 
      } 
     } 

     EXPOSE_IN_DLL ~DummyServer(){} 


     void ReceivePackages(){ 
      //Start Receiver thread 
      listenIoService = std::make_shared<io_service>(); 
      listenSocket = std::make_shared<ip::udp::socket>(*listenIoService, ip::udp::endpoint(ip::udp::v4(), 56000)); 

      //Set up receiving buffer 
      auto bytesReceived = 0; 
      unsigned char receiveBuffer[1024 * 70]; 
      auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 1024 * 70); 

      unsigned char returnBufferBuffer[10]; 
      auto bufferToSendBackToClient = boost::asio::buffer(returnBufferBuffer, 10); 

      ip::udp::endpoint senderEndPoint; 
      //Receive packages 
      while (true){ 
       listenSocket->receive_from(bufferToReceiveInto, senderEndPoint); 
       PackagesReceived++; 

       //Send an "Ack" package back to client - letting him know that a package successfully made it. 
       //Doesn't matter what the package contains - since client is just counting. 
       listenSocket->send_to(bufferToSendBackToClient, senderEndPoint); 
      } 
    } 
}; 

}

+0

我會失去C#標記。它最好是切線。 – user4581301

+0

我同意。 C#標籤現在被移除。 –

+1

你的代碼中有大量的錯誤。例如,你的'ThrottleThread'可以在'SendPackages'線程正在修改時訪問'PackagesSent'。這可能會導致丟失的增量。您需要使用互斥體保護共享狀態或使用適當的原子類型/操作。 –

回答

1

發現了問題 - 在最後。這是造成低速的sleep_for

此代碼

boost::this_thread::sleep_for(boost::chrono::milliseconds(20)); 

往往需要30ms以上 - 當其他線程正在做的工作。

我想我必須使用不同的限制策略(在飛行中計數包等) - 或使用高精度多媒體計時器。