package com.hortonworks.example.kafka.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
Configuring Kafka for Kerberos
Over Ambari
May 9, 2016
9
import java.util.Collections;
import java.util.Properties;
public class BasicConsumerExample {
public static void main(String[] args) {`enter code here`
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.
example.com:6667");
// specify the protocol for SSL Encryption
consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SASL_PLAINTEXT");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.
apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerConfig);
TestConsumerRebalanceListener rebalanceListener = new
TestConsumerRebalanceListener();
consumer.subscribe(Collections.singletonList("test-topic"),
rebalanceListener);
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s,
offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
private static class TestConsumerRebalanceListener implements
ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
System.out.println("Called onPartitionsRevoked with partitions:" +
partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
System.out.println("Called onPartitionsAssigned with partitions:" +
partitions);
}
}
}
무엇이 잘못되었는지 소비자 코드를 공유 할 수 있습니까? – Kaushal
@Kaushal, 아래 코드를 게시했습니다. – Sanj
클러스터에서 소비자 jar를 로컬로 실행하십시오. –