사람이 아래 queries.I에서 저를 도와 줄 수는 카프카 - 클라이언트 - 0.10.1.1 (단일 노드 단일 브로커) auto.create.topics.enable의카프카 클라이언트 API 질문
기본값을 사용하고 있습니다이 참입니다.
1.I는 소비를 들어
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
사용하여 주제에 메시지를 보내고 :
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
문제는 내가 처음으로 소비자를 실행할 때, 그것은 값을하지 않는 것입니다. 그리고 저는 생산자를 운영하고 소비자를 다시 불러 와서 가치를 얻습니다. 프로듀서를 3 번 실행해야하는 경우도 있습니다. 왜 이렇게 작동합니까? enable.auto.commit 속성이 false 인 경우
이 같은 소비자가 메시지를 여러 번 읽을 수 거짓
2) enable.auto.commit =? 1 point.How 내 소비자 코드는 내가 말 루프를 깰 수 고려
3.) 어떻게 소비자는 항상 사용)은 모든 메시지를 읽고 다음 consumer.close()
kafka bin에 console-consumer가 있습니다. 소비자가 데이터를 소비 할 수없는 동안 시도해 볼 수 있습니다. 가능하면 producer.flush()를 추가하십시오. 3 번 질문에 대해서는 스트리밍 프로그램이 배치의 끝을 알 수있는 방법이 없지만 시간 초과 스레드를 설정하여 데이터가 소비되지 않는 시간 초과를 모니터링 할 수 있습니다. – Lhfcws
예 bin 소비자로 테스트 한 결과 상관 ID 1을 사용하여 메타 데이터를 가져 오는 동안 오류가 발생했습니다 : {my-topic-106 = LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) – jena84
데이터를 생성 했습니까? 최근에 데이터를 소비하기 전에?기본적으로 Kafka는 데이터를 3 일 동안 만 보관합니다. – Lhfcws