2017-05-12 1 views
0

저는 소비자 데이터에 0.9 버전의 Kafka를 사용하지만, 프로그램은 항상 consumer.poll (100) 행으로 실행되고 계속 실행되지 않습니다. 나는 kafka-clients-0.9.0.0의 항아리를 사용합니다.
카프카 : kafka_2.10-0.9.0.0
사육사 : 사육사-3.4.5kafka-0.9는 소비자 데이터가 아닙니다.

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "10.28.176.11:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("tmp")); 
    try { 
     while (true) { 
      System.out.println("start comsuming..."); 
      ConsumerRecords<String, String> records = consumer.poll(100); 
      System.out.println("start print data......."); 
      for (ConsumerRecord<String, String> record : records) 
       System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     } 
    } finally { 
     consumer.close(); 
    } 
} 

로그 : 내가 알아 낸

enter image description here

+0

나는 여기에서 언급 한 방법 (http://stackoverflow.com/questions/37770024/kafka-0-9-0-1-java-consumer-stuck-in-awaitmetadataupdate)을 사용하여 ADVERTISED_PORT 및 ADVERTISED_HOST 변수를 설정합니다. ,하지만 여전히 소비자에 붙어 있습니다. 폴 통화. –

답변

0

. 나는/borkers/admin/comsumers/config/controller/controller_epoch/isr_change_notification과 같이 사육사의 모든 카프카 메타 데이터를 삭제 한 후 카프카를 다시 시작했다. 아마도 카프카 클러스터에 약간의 오류가 있다고 생각합니다.