2017-03-18 7 views
2

나는 다음과 같은 Thrift에 정의 structs의 집합이 있습니다

struct Foo { 
    1: i32 a, 
    2: i64 b 
} 

나는 C++에서 다음을 수행해야합니다

의 (a) 직렬화를 (또는 Compact 드리프트 프로토콜을 사용하는) Foo의 인스턴스

(b) 바이트 직렬화 된 인스턴스 Kafka 주제

질문

에스는 내가 Kafka 클러스터에 Thrift 직렬화 된 인스턴스를 보내 어떻게해야합니까?

미리 감사드립니다.

+0

의 사용 가능한 복제 [드리프트 :이 C++ 드리프트 라이브러리 만 직렬화를 할 수 있는가] (http://stackoverflow.com/questions/12328896/thrift-is-it-possible-to-do- 전용 직렬화와 함께 C 쓰레드 라이브러리) – JensG

답변

2

내 자신의 질문에 대한 대답을 알아 냈습니다.

직렬화

코드 조각 아래 (중고품 Compact 프로토콜을 사용하여) FooThrift 바이트에 호환 인스턴스를 직렬화하는 방법을 도시한다. Binary 프로토콜을 사용하려면 TCompactProtocolTBinaryProtocol으로 대체하십시오. 카프카 클러스터 다음 코드는 카프카 클러스터에 드리프트 호환 바이트를 전송하는 방법을 보여줍니다

에 보내기

#include <thrift/transport/TBufferTransports.h> 
#include <thrift/protocol/TCompactProtocol.h> 

using apache::thrift::protocol::TCompactProtocol; 
using apache::thrift::transport::TMemoryBuffer; 

... 
... 
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer()); 
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer)); 
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *))); 
uint32_t num_bytes = 0; 

// 'foo' is an instance of Foo 
foo->write(protocol.get()); 
buffer->getBuffer(serialized_bytes, &num_bytes); 

.

참고 : 아래에 사용 된 kafka 클라이언트 라이브러리는 librdkafka입니다.

#include "rdkafkacpp.h" 

std::string errstr; 

// Create global configuration 
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
conf->set("metadata.broker.list", "localhost:9092", errstr); 
conf->set("api.version.request", "true", errstr); 

// Create kafka producer 
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 

// Create topic-specific configuration 
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr); 

auto partition = 1; 

// Sending the serialized bytes to Kafka cluster 
auto res = producer->produce(
    topic, partition, 
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
    serialized_bytes, num_bytes, 
    NULL, NULL); 

    if (res != RdKafka::ERR_NO_ERROR) { 
    std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl; 
    } else { 
    std::cout << "Published message of " << num_bytes << " bytes" << std::endl; 
    } 

producer->flush(10000);