2013-03-15 83 views
6

我想評估使用async boost udp/tcp套接字操作vs同步爲我的應用程序。我一直在試圖找到一個與我的設計類似的例子,但沒有找到任何讓我相信我可能會嘗試將異步操作放入我的設計中的任何東西,即使它不是正確的路徑。提升asio - 多個客戶端連接到不同的服務器

我想連接到多個(讀:1-10之間)服務器,並使用不同的協議進行溝通;我有4-5個線程正在生成需要傳送到這些服務器連接中的任何一個的數據。

我當前設計是同步的,並使用每個服務器連接螺紋的io_service對象,然後使用產生的線程和每個連接線之間的線程安全隊列。

這種設計似乎並不在吞吐量性能方面可擴展的,這是我想最大化。

是否有任何示例將這種多連接提供給不同的服務器模式?

回答

2

我已經寫了客戶端連接到使用TCP/IP SSL/TLS,其與ASIO實現6臺不同的服務器。所有6個使用相同的協議。所以,如果有幫助,這是我的代碼:

SSLSocket.H

#pragma once 

#include <cstdlib> 
#include <iostream> 
#include <queue> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/asio/ssl.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/mutex.hpp> 
#include <boost/shared_ptr.hpp> 
using namespace std; 
// 
#include "BufferManagement.h" 
#include "Logger.h" 
#include "Common Classes\Locking.h" 
#include "Message.h" 

class SSLSocket; 
class ConcurrentMsgQueue; 

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 

typedef void (__stdcall *Callback)(const SSLSocket* pSSLS, const int bytesInMsg, const void* pBuf); 

// typedef std::vector<boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SocketVectorType; 

enum {MsgLenBytes = 4}; 

class SSLSocket 
{ 
    // This class handles all communications between the client and the server 
    // using TCP/IP SSL v1. The Boost ASIO (Asynchronous I/O) library is used to accomplish this. 
    // Initally written by Bob Bryan on 1/21/2013. 
    // 
public: 
    SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, const LogLevel levelOfLog, const string& logFileName, const int bufMangLen); 
    ~SSLSocket(); 
    void Connect(SSLSocket* psSLS, const string& serverPath, string& port); 
    void SendToServer(const int bytesInMsg, Byte* pBuf); 
    void Stop(); 

    static void SetCallback(Callback callbackFunction) 
    { 
     // This method is required in order to be able to do a reverse pinvoke from C#. 
     // This callback function pointer is what is used to communicate back to the C# code. 
     CallbackFunction = callbackFunction; 
    } 

    static Byte* AllocateMem(int length) 
    { 
     // Allocate some memory. This method winds up getting called when the C# client needs to allocate some memory for a message. 
     Byte* pBuf = BufMang.GetPtr(length); 
     return pBuf; 
    } 
    // 
    static Logger Log; // Object used to log info to a file and/or to the console. 
    static Callback CallbackFunction; // Callback function object used to communicate with the worker thread in C#. 

private: 
    void InitAsynchIO(); 
    void HandleConnect(const boost::system::error_code& error); 
    void HandleHandshake(const boost::system::error_code& error); 
    void HandleFirstWrite(const boost::system::error_code& error, size_t bytes_transferred); 
    void HandleRead(const boost::system::error_code& error, size_t bytesTransferred); 
    // void HandleRead(const boost::system::error_code& error, size_t bytes_transferred); 
    void Terminate(); 
    void static RcvWorkerThread(SSLSocket* sSLS); 
    void static SendWorkerThread(SSLSocket* psSLS); 
    void ProcessSendRequests(); 
    void HandleWrite(const boost::system::error_code& error, size_t bytesTransferred); 
    static void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service); 
    // 
    struct Bytes 
    { 
     // Used to convert 4 bytes to an int. 
     unsigned char B1; 
     unsigned char B2; 
     unsigned char B3; 
     unsigned char B4; 
    }; 

    union Bytes4ToInt 
    { 
     // Converts 4 bytes to an int. 
     int IntVal; 
     Bytes B; 
    }; 

    inline int BytesToInt(const Byte * pBuf) 
    { 
     // This method converts 4 bytes from an array of bytes to a 4-byte int. 
     B2I.B.B1 = *pBuf++; 
     B2I.B.B2 = *pBuf++; 
     B2I.B.B3 = *pBuf++; 
     B2I.B.B4 = *pBuf; 
     int Value = B2I.IntVal; 
     return Value; 
    } 
    // 
    boost::thread_group WorkerThreads; // Used to handle creating threads. 
    CRITICAL_SECTION SocketLock; // Used in conjuction with the Locking object to handle single threading the code. 
    boost::asio::ssl::stream<boost::asio::ip::tcp::socket>* pSocket; // Pointer to the socket object. 
    Bytes4ToInt B2I; // Used to translate 4 bytes in the buffer to an int representing the number of bytes in the msg. 
    std::string sClientIp; // Client IP address. Used for logging. 
    unsigned short uiClientPort; // Port number. Used for logging. 
    // static MessageList* pRepMsgs; // Link list of the msgs to send to the server. 
    Byte* pDataBuf; // Pointer to the data for the current message to be read. 
    static boost::shared_ptr<boost::asio::io_service> IOService; // Object required for use by ASIO to perform certain functions. 
    static bool RcvThreadCreated; // Set when the rcv thread is created so that it won't try to create it again. 
    static int StaticInit; // Indicates whether or not the static members have been initialized or not. 
    static bool DisplayInHex; // Specifies whether to display a buffer in hex or not. 
    static BufferManagement BufMang; // Smart pointer to the buffer used to handle requests coming to and from the server for all sockets. 
    volatile static bool ReqAlive; // Used to indicate whether the request thread should die or not. 
    // static bool RepAlive; // Used to indicate whether the response thread should die or not. 
    static ConcurrentMsgQueue SendMsgQ; // Holds the messages waiting to be sent to the server. 
    static HANDLE hEvent; // Used for signalling between threads. 
}; 

SSLSocket.cpp

#include "StdAfx.h" 
#include "SSLSocket.h" 

boost::shared_ptr<boost::asio::io_service> SSLSocket::IOService; 
int SSLSocket::StaticInit = 0; 
Callback SSLSocket::CallbackFunction; 
BufferManagement SSLSocket::BufMang; 
volatile bool SSLSocket::ReqAlive = true; 
Logger SSLSocket::Log; 
HANDLE SSLSocket::hEvent; 
bool SSLSocket::DisplayInHex; 
ConcurrentMsgQueue SSLSocket::SendMsgQ; 
bool SSLSocket::RcvThreadCreated = 0; 
BufferManagement* Message::pBufMang; 

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, 
    const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0) 
{ 
    // SSLSocket Constructor. 
    // If the static members have not been intialized yet, then initialize them. 
    if (!StaticInit) 
    { 
     DisplayInHex = displayInHex; 
     BufMang.Init(bufMangLen); 
     Message::SetBufMang(&BufMang); 
     // This constructor enables logging according to the vars passed in. 
     Log.Init(logToFile, logToConsole, levelOfLog, logFileName); 
     // Create the crit section object 
     // Locking::InitLocking(ReadLock); 
     // Locking::InitLocking(WriteLock); 
     StaticInit++; 
     hEvent = CreateEvent(NULL, false, false, NULL); 
     // Define the ASIO IO service object. 
     // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service); 
     boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service); 
     IOService = IOServ; 
    } 
} 

SSLSocket::~SSLSocket(void) 
{ 
    delete pSocket; 
    if (--StaticInit == 0) 
     CloseHandle(hEvent); 
} 

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port) 
{ 
    // Connects to the server. 
    // serverPath - specifies the path to the server. Can be either an ip address or url. 
    // port - port server is listening on. 
    // 
    try 
    { 
     Locking CodeLock(SocketLock); // Single thread the code. 
     // If the user has tried to connect before, then make sure everything is clean before trying to do so again. 
     if (pSocket) 
     { 
     delete pSocket; 
     pSocket = 0; 
     }                         
     // If serverPath is a URL, then resolve the address. 
     // Note that this code expects the first server to always have a url. 
     if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr. 
     { 
     // Create the resolver and query objects to resolve the host name in serverPath to an ip address. 
     boost::asio::ip::tcp::resolver resolver(*IOService); 
     boost::asio::ip::tcp::resolver::query query(serverPath, port); 
     boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // The thread we are on now, is most likely the user interface thread. Create a thread to handle all incoming socket work messages. 
     if (!RcvThreadCreated) 
     { 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this)); 
      RcvThreadCreated = true; 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this)); 
     } 
     // Try to connect to the server. Note - add timeout logic at some point. 
     boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator, 
      boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
     // serverPath is an ip address, so try to connect using that. 
     // 
     // Create an endpoint with the specified ip address. 
     const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath)); 
     int iport = atoi(port.c_str()); 
     const boost::asio::ip::tcp::endpoint EP(IP, iport); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // Try to connect to the server. Note - add timeout logic at some point. 
     //pSocket->core_.engine_.do_connect(void*, int); 
     // pSocket->next_layer_.async_connect(EP, &SSLSocket::HandleConnect) 
     // pSocket->next_layer().async_connect(EP, &SSLSocket::HandleConnect); 
     boost::system::error_code EC; 
     pSocket->next_layer().connect(EP, EC); 
     if (EC) 
     { 
      // Log an error. This worker thread should exit gracefully after this. 
      stringstream ss; 
      ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << EC.message() + ".\n"; 
      Log.LogString(ss.str(), LogError); 
     } 
     HandleConnect(EC); 
     // boost::asio::async_connect(pSocket->lowest_layer(), EP, 
     // boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf) 
{ 
    // This method creates a msg object and saves it in the SendMsgQ object. 
    // sends the number of bytes specified by bytesInMsg in pBuf to the server. 
    // 
    Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf); 
    SendMsgQ.Push(pMsg); 
    // Signal the send worker thread to wake up and send the msg to the server. 
    SetEvent(hEvent); 
} 


void SSLSocket::SendWorkerThread(SSLSocket* psSLS) 
{ 
    // This thread method that gets called to process the messages to be sent to the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->ProcessSendRequests(); 
} 

void SSLSocket::ProcessSendRequests() 
{ 
    // This method handles sending msgs to the server. 
    // 
    std::stringstream ss; 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Front(); 
      SSLSocket* pSSL = pMsg->pSSL; 
      SendMsgQ.Pop(); 
      const Byte* pBuf = pMsg->pBuf; 
      const int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      { 
       Locking CodeLock(SocketLock); // Single thread the code. 
       boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this, 
        boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
      } 
      ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. Nothing to do really since reading is handled by the HandleRead method. 

    std::stringstream ss; 
    try 
    { 
     if (error) 
     { 
     ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS) 
{ 
    // This is the method that gets called when the receive thread is created by this class. 
    // This thread method focuses on processing messages received from the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->InitAsynchIO(); 
} 

void SSLSocket::InitAsynchIO() 
{ 
    // This method is responsible for initiating asynch i/o. 
    boost::system::error_code Err; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for asynch i/o. The thread will hang here until the stop method has been called or an error occurs. 
     // Add a work object so the thread will be dedicated to handling asynch i/o. 
     boost::asio::io_service::work work(*IOService); 
     IOService->run(); 
     Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleConnect(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the connect request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     pSocket->async_handshake(boost::asio::ssl::stream_base::client, 
      boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error)); 
     ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     } 
     else 
     { 
     // Log an error. This worker thread should exit gracefully after this. 
     ss << "SSLSocket::HandleConnect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << error.message() + ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleHandshake(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the handshake request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // Try to send the first message that the server is expecting. This msg tells the server we want to start communicating. 
     // This is the only msg specified in the C++ code. All other msg processing is done in the C# code. 
     // 
     unsigned char Msg[27] = {0x17, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x41, 
      0x74, 0x74, 0x61, 0x63, 0x6b, 0x50, 0x6f, 0x6b, 0x65, 0x72, 0x02, 0x00, 0x65, 0x6e}; 
     boost::system::error_code Err; 

     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(Msg), boost::bind(&SSLSocket::HandleFirstWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(27), Err); 
     if (Err) 
     { 
      ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n"; 
      Log.LogString(ss.str(), LogInfo); 
     } 
     HandleFirstWrite(Err, Count); 
     // boost::asio::async_write(pSocket, boost::asio::buffer(Msg, 27), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     ss.str(""); 
     ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     } 
     else 
     { 
     ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n"; 
     IOService->stop(); 
     } 
     Log.LogString(ss.str(), LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, bytesTransferred), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, 84), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // Locking CodeLock(ReadLock); // Single thread the code. 
     // Signal the other threads that msgs are now ready to be sent and received. 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pRepBuf), boost::asio::transfer_exactly(4), boost::bind(&SSLSocket::HandleRead, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // 
     // Notify the UI that we are now connected. Create a 6 byte msg for this. 
     pDataBuf = BufMang.GetPtr(6); 
     BYTE* p = pDataBuf; 
     // Create msg type 500 
     *p = 244; 
     *++p = 1; 
     CallbackFunction(this, 2, (void*)pDataBuf); 
     // Get the 1st 4 bytes of the next msg, which is always the length of the that msg. 
     pDataBuf = BufMang.GetPtr(MsgLenBytes); 

     // int i1=1,i2=2,i3=3,i4=4,i5=5,i6=6,i7=7,i8=8,i9=9; 
     // (boost::bind(&nine_arguments,_9,_2,_1,_6,_3,_8,_4,_5,_7)) 
     //  (i1,i2,i3,i4,i5,i6,i7,i8,i9); 

     // boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, _1,_2,_3)) 
     // (this, pReqBuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); 
     // boost::asio::async_read(*pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), boost::bind(&Client::handle_read, 
     //  this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 

     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this, 
      boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     } 
     else 
     { 
     ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called to process an incomming message. 
    // 
    std::stringstream ss; 
    int ByteCount; 
    try 
    { 
     ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Set to exit this thread if the user is done. 
     if (!ReqAlive) 
     { 
     // IOService->stop(); 
     return; 
     } 
     if (!error) 
     { 
     // Get the number of bytes in the message. 
     if (bytesTransferred == 4) 
     { 
      ByteCount = BytesToInt(pDataBuf); 
     } 
     else 
     { 
      // Call the C# callback method that will handle the message. 
      ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3); 
      Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2); 
      CallbackFunction(this, bytesTransferred, (void*)pDataBuf); 
      // Prepare to read in the next message length. 
      ByteCount = MsgLenBytes; 
     } 
     pDataBuf = BufMang.GetPtr(ByteCount); 
     boost::system::error_code Err; 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::read(pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), Err); 
     } 
     else 
     { 
     Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::Stop() 
{ 
    // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on. If this is not done, then an exception will be thrown 
    // when it comes time to delete this object. 
    ReqAlive = false; 
    SetEvent(hEvent); 
    IOService->stop(); 
} 

所以,這裏的關鍵點:

  1. 當連接到ser ver第一次創建了一個SSLSocket類的新實例。 io_service對象是靜態的,只創建一次。它由SSLSocket類的所有6個實例使用。

  2. 有2個線程用於與所有6臺服務器之間的套接字通信有關的所有事情。一個線程用於處理從服務器收到的消息。另一個線程用於向服務器發送消息。

  3. 此代碼使用SSL/TSL。如果您使用直接TCP,則可以簡單地刪除SSLSocket :: Connect方法中的3行以及ssl #include行。

  4. HandleRead中使用的技術使用雙重讀取方法。第一次讀取獲取字節數(因爲協議使用前4個字節作爲消息長度),第二次讀取獲得該消息中的總字節數。這可能不是處理從套接字讀取數據的最有效或甚至最理想的方式。但是,這是最容易理解的。如果您的協議不同並且/或者郵件大小更大並且您有能力在收到整個郵件之前開始處理郵件,則可以考慮使用其他方法。

  5. 此代碼在Visual Studio 2008 for Windows中使用Boost 1.52.0。

+0

感謝這給了我如何做到這一點的想法,正如山姆所說,直到有更多的連接之前,走異步路線可能並不富有成效。現在我會把它放在我的後袋裏。 – RishiD 2013-03-15 17:23:26

+0

此代碼看起來不正確,鎖定CodeLock(SocketLock)的目的是什麼?提高:: ASIO :: ASYNC_WRITE(...);'?評論似乎表明這應該是單線程的。由於應用程序需要確保每個流最多有一個寫操作正在運行,因此此處互斥量不足。 – 2013-03-20 22:48:00

+0

當我第一次嘗試這個時,我不確定將使用多少個線程,並擔心multiptle線程可能會嘗試使用相同的套接字對象 - 正如您所知,這並不安全。我後來改進了代碼,使用單個線程進行讀取,並使用單個線程進行寫入,但留在了鎖中。我應該把它們拿出來。 「你的意思是」互斥體在這裏是不夠的,因爲應用程序需要確保每個流最多隻有一個寫操作正在運行。「 ?你做什麼呢?你看過HTTP Server 3的例子嗎? – 2013-03-21 05:40:14

1

有附帶Asio examples一到多的客戶端 - 服務器的設計沒有直接的例子。如果你的設計固定在最多10個連接上,每個線程使用同步通信應該沒問題。然而,如果你打算擴大到比這更多的範圍,很明顯看到創建幾百或幾千個線程的收益遞減。

那就是說,用async_connect加上async_readasync_write不難理解或實現。我已經使用這個相同的概念來管理world's fastest supercomputer上的幾千個連接,只使用少量的線程。如果您選擇此路線,async TCP client example可能是最好的研究對象。

如果您正在尋找的不僅僅是更多的例子,有幾個open source projects使用短耳,你可能會發現有用。

+0

感謝您的輸入,猜測我仍然試圖包裹我的大腦周圍的異步通信。我理解服務器端的價值,但無法在客戶端看到價值。 – RishiD 2013-03-15 17:25:59

相關問題