2017-03-18 44 views
2

我在Thrift定義了一組structs如下列:在C++串行化的串行化節儉結構卡夫卡

struct Foo { 
    1: i32 a, 
    2: i64 b 
} 

我需要做在C++以下:

(a)中序列化Foo的實例轉換爲Thrift兼容字節(使用BinaryCompact Thrift協議)

(b)發送字節序列化的實例ES到Kafka話題

問題

如何我做的Thrift序列化實例發送到Kafka集羣?

在此先感謝

+0

的可能的複製[節儉:是否有可能做只系列化與C++庫節儉(http://stackoverflow.com/questions/12328896/thrift-is-it-possible-to-do-只有-系列化,與-C-節儉庫) – JensG

回答

2

想出了我自己的問題的答案。下面

系列化

代碼段示出了如何序列的FooThrift兼容字節的實例(使用節儉Compact協議)。爲了使用Binary協議,請將TCompactProtocol替換爲TBinaryProtocol

#include <thrift/transport/TBufferTransports.h> 
#include <thrift/protocol/TCompactProtocol.h> 

using apache::thrift::protocol::TCompactProtocol; 
using apache::thrift::transport::TMemoryBuffer; 

... 
... 
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer()); 
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer)); 
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *))); 
uint32_t num_bytes = 0; 

// 'foo' is an instance of Foo 
foo->write(protocol.get()); 
buffer->getBuffer(serialized_bytes, &num_bytes); 

發送到卡夫卡簇

下面的代碼段示出了如何將節儉兼容字節發送到卡夫卡羣集。

備註:下面使用的kafka客戶端庫是librdkafka

#include "rdkafkacpp.h" 

std::string errstr; 

// Create global configuration 
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
conf->set("metadata.broker.list", "localhost:9092", errstr); 
conf->set("api.version.request", "true", errstr); 

// Create kafka producer 
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 

// Create topic-specific configuration 
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr); 

auto partition = 1; 

// Sending the serialized bytes to Kafka cluster 
auto res = producer->produce(
    topic, partition, 
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
    serialized_bytes, num_bytes, 
    NULL, NULL); 

    if (res != RdKafka::ERR_NO_ERROR) { 
    std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl; 
    } else { 
    std::cout << "Published message of " << num_bytes << " bytes" << std::endl; 
    } 

producer->flush(10000);