2014-07-09 2 views
0

나는 생산자와 번들 및 저장소를 얻을 수있는 소비자로부터 중고품 번들을 보내려고하지만 나는 공식 예제를 사용하여 생산자와 소비자 greoup을 만든구성 카프카는

을 카프카 꽤 새로운 오전 번들 배열.

내가

KeyedMessage<String, Bundle> data = new KeyedMessage<String, Bundle>("bundles", "Bundle", bundle); 
     producer.send(data); 

그러나 소비자 측에서 생산자 측 코드를 작성했습니다

나는

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(NO_OF_THREADS)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> messageStreams = consumerMap.get(topic); 

내가 카프카 소비자가 번들 형식이 아닌 바이트 [] 배열로 데이터를 얻을 수 있도록 할 수 있습니다 .

답변

1

는 직접 번들 형으로 값을 디코딩하기 위해 다음과 같은 방법을 사용할 수 있습니다 :

public interface kafka.javaapi.consumer.ConsumerConnector { 
    ... 
    public <K,V> Map<String, List<KafkaStream<K,V>>> 
    createMessageStreams(
     Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder); 

을 당신이 당신의 자신의 valueDecoder 해당 드리프트 API를 사용하여 유형 Decoder<Bundle>의 구현해야합니다이 경우.

Kafka 문서의 High Level Consumer API 설명을 참조하십시오.