2012-02-29 51 views
29

我想在Linux平臺中探索協議緩衝區(PB),我的編碼語言是C++。我在協議緩衝區在線文檔中找到了一些示例,但沒有具體給套接字發送和接收(或者我完全錯過了:))。所以我決定在實際消息之前添加消息長度並通過套接字發送。如果有人能夠提出比我打算做的更好的解決方案,我會很感激,而且還有什麼東西可以在PB中製作出來,用於創建這樣的數據包。C++中的套接字協議緩衝區

但我仍然在服務器端出現問題,我必須解碼數據包。假設客戶端發送一個10字節的數據包,其中前4個字節是數據包的長度;但在解碼數據包之前不可能知道長度。所以,即使我讀了前4個字節,我如何使用協議緩衝區推斷出半讀數據包的值。

回答

25

最後我可以得到它的工作。我在這裏發佈代碼,以便人們可以查看和評論它,以及如果有人想用C++實現它,這段代碼可以提供幫助。這是一個破舊的代碼,我的意圖是讓Protobuf以前綴方式工作。我從一些我不記得的網站獲取客戶端服務器的代碼,並且修改了它以適應protobuf。在這裏,服務器首先窺探套接字並獲取總數據包的長度,然後實際讀取套接字以讀取整個數據包。可以有萬億種方法來做到這一點,但爲了快速解決問題,我以這種方式做到了。但我需要找到一個更好的方法來避免每個數據包2 recv,但在我的情況下,所有的消息都是不同的大小,所以這是我猜測的唯一方法。

原型文件

message log_packet { 
    required fixed64 log_time =1; 
    required fixed32 log_micro_sec =2; 
    required fixed32 sequence_no =3; 
    required fixed32 shm_app_id =4; 
    required string packet_id =5; 
    required string log_level=6; 
    required string log_msg=7; 
    } 

協議緩衝客戶端代碼

#include <unistd.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/message.h> 
#include <google/protobuf/descriptor.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 


using namespace google::protobuf::io; 

using namespace std; 
int main(int argv, char** argc){ 

/* Coded output stram */ 

log_packet payload ; 

payload.set_log_time(10); 
payload.set_log_micro_sec(10); 
payload.set_sequence_no(1); 
payload.set_shm_app_id(101); 
payload.set_packet_id("TST"); 
payload.set_log_level("DEBUG"); 
payload.set_log_msg("What shall we say then"); 

cout<<"size after serilizing is "<<payload.ByteSize()<<endl; 
int siz = payload.ByteSize()+4; 
char *pkt = new char [siz]; 
google::protobuf::io::ArrayOutputStream aos(pkt,siz); 
CodedOutputStream *coded_output = new CodedOutputStream(&aos); 
coded_output->WriteVarint32(payload.ByteSize()); 
payload.SerializeToCodedStream(coded_output); 

     int host_port= 1101; 
     char* host_name="127.0.0.1"; 

     struct sockaddr_in my_addr; 

     char buffer[1024]; 
     int bytecount; 
     int buffer_len=0; 

     int hsock; 
     int * p_int; 
     int err; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n",errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n",errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = inet_addr(host_name); 
     if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       if((err = errno) != EINPROGRESS){ 
         fprintf(stderr, "Error connecting socket %d\n", errno); 
         goto FINISH; 
       } 
     } 




     for (int i =0;i<10000;i++){ 
      for (int j = 0 ;j<10;j++) { 

       if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) { 
         fprintf(stderr, "Error sending data %d\n", errno); 
         goto FINISH; 
       } 
       printf("Sent bytes %d\n", bytecount); 
       usleep(1); 
     } 
     } 
     delete pkt; 

FINISH: 
     close(hsock); 

} 

協議緩衝服務器代碼

#include <fcntl.h> 
#include <string.h> 
#include <stdlib.h> 
#include <errno.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <resolv.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 

using namespace std; 
using namespace google::protobuf::io; 



void* SocketHandler(void*); 

int main(int argv, char** argc){ 

     int host_port= 1101; 

     struct sockaddr_in my_addr; 

     int hsock; 
     int * p_int ; 
     int err; 

     socklen_t addr_size = 0; 
     int* csock; 
     sockaddr_in sadr; 
     pthread_t thread_id=0; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n", errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n", errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = INADDR_ANY ; 

     if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); 
       goto FINISH; 
     } 
     if(listen(hsock, 10) == -1){ 
       fprintf(stderr, "Error listening %d\n",errno); 
       goto FINISH; 
     } 

     //Now lets do the server stuff 

     addr_size = sizeof(sockaddr_in); 

     while(true){ 
       printf("waiting for a connection\n"); 
       csock = (int*)malloc(sizeof(int)); 
       if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){ 
         printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); 
         pthread_create(&thread_id,0,&SocketHandler, (void*)csock); 
         pthread_detach(thread_id); 
       } 
       else{ 
         fprintf(stderr, "Error accepting %d\n", errno); 
       } 
     } 

FINISH: 
;//oops 
} 

google::protobuf::uint32 readHdr(char *buf) 
{ 
    google::protobuf::uint32 size; 
    google::protobuf::io::ArrayInputStream ais(buf,4); 
    CodedInputStream coded_input(&ais); 
    coded_input.ReadVarint32(&size);//Decode the HDR and get the size 
    cout<<"size of payload is "<<size<<endl; 
    return size; 
} 

void readBody(int csock,google::protobuf::uint32 siz) 
{ 
    int bytecount; 
    log_packet payload; 
    char buffer [siz+4];//size of the payload and hdr 
    //Read the entire buffer including the hdr 
    if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     } 
    cout<<"Second read byte count is "<<bytecount<<endl; 
    //Assign ArrayInputStream with enough memory 
    google::protobuf::io::ArrayInputStream ais(buffer,siz+4); 
    CodedInputStream coded_input(&ais); 
    //Read an unsigned integer with Varint encoding, truncating to 32 bits. 
    coded_input.ReadVarint32(&siz); 
    //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
    //from reading beyond that length.Limits are used when parsing length-delimited 
    //embedded messages 
    google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); 
    //De-Serialize 
    payload.ParseFromCodedStream(&coded_input); 
    //Once the embedded message has been parsed, PopLimit() is called to undo the limit 
    coded_input.PopLimit(msgLimit); 
    //Print the message 
    cout<<"Message is "<<payload.DebugString(); 

} 

void* SocketHandler(void* lp){ 
    int *csock = (int*)lp; 

     char buffer[4]; 
     int bytecount=0; 
     string output,pl; 
     log_packet logp; 

     memset(buffer, '\0', 4); 

     while (1) { 
     //Peek into the socket and get the packet size 
     if((bytecount = recv(*csock, 
         buffer, 
           4, MSG_PEEK))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     }else if (bytecount == 0) 
       break; 
     cout<<"First read byte count is "<<bytecount<<endl; 
     readBody(*csock,readHdr(buffer)); 
     } 

FINISH: 
     free(csock); 
    return 0; 
} 
+0

非常感謝好友。 – 2013-11-14 10:49:21

+0

對於問題+1和對答案+1,這是有幫助的。 – 2013-11-14 11:08:23

+1

謝謝!這對我遇到的問題非常有幫助 – 2013-11-18 20:20:37

10

不幸的是,protobuf的不適用於「包裝」(定界)你的消息提供了一種方式:

如果你想多封郵件寫到一個文件或流,它 是你保持跟蹤一條消息結束的位置,並且下一個 開始。協議緩衝區連線格式不是自行分隔的,所以 協議緩衝區分析程序無法確定消息在其所屬的 上的結束位置。解決此問題的最簡單方法是在寫入消息之前寫入每條消息的大小 。

(從他們documentation

所以,他們基本上建議您得出了相同的解決方案。

+0

謝謝塔馬斯,所以我會用我的方法去 – punith 2012-02-29 09:40:04