2017-04-05 2 views
0

Kafka Producer와 Kafka 클라이언트가 다른 Hadoop 클러스터에있는 반면 내 로컬 컴퓨터에서 Kafka Consumer (Java 코드)를 실행 하시겠습니까?Kafka 로컬 컴퓨터에서 실행되는 Consumer-java API

kafka 제작자 스크립트 (kafka-console-producer.sh)를 사용하여 kafka 클러스터에 메시지를 보낼 수 있지만 로컬 이클립스 콘솔에서 kafka 소비자의 메시지에 액세스 할 수 없습니까?

구성 측면에서 변경해야 할 사항이 있으면 알려주십시오. 모든 예제 자바 코드에 적합

+1

무엇이 잘못되었는지 소비자 코드를 공유 할 수 있습니까? – Kaushal

+0

@Kaushal, 아래 코드를 게시했습니다. – Sanj

+0

클러스터에서 소비자 jar를 로컬로 실행하십시오. –

답변

0
package com.hortonworks.example.kafka.consumer; 
import org.apache.kafka.clients.CommonClientConfigs; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.TopicPartition; 
import java.util.Collection; 
Configuring Kafka for Kerberos 
Over Ambari 
May 9, 2016 
9 
import java.util.Collections; 
import java.util.Properties; 
public class BasicConsumerExample { 
public static void main(String[] args) {`enter code here` 
Properties consumerConfig = new Properties(); 
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka. 
example.com:6667"); 
// specify the protocol for SSL Encryption 
consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT"); 
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); 
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest"); 
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer"); 
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org. 
apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfig); 
TestConsumerRebalanceListener rebalanceListener = new 
TestConsumerRebalanceListener(); 
consumer.subscribe(Collections.singletonList("test-topic"), 
rebalanceListener); 
while (true) { 
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000); 
for (ConsumerRecord<byte[], byte[]> record : records) { 
System.out.printf("Received Message topic =%s, partition =%s, 
offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), 
record.offset(), record.key(), record.value()); 
} 
consumer.commitSync(); 
} 
} 
private static class TestConsumerRebalanceListener implements 
ConsumerRebalanceListener { 
@Override 
public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{ 
System.out.println("Called onPartitionsRevoked with partitions:" + 
partitions); 
} 
@Override 
public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
{ 
System.out.println("Called onPartitionsAssigned with partitions:" + 
partitions); 
} 
} 
}