kafka 상위 소비자를 사용하고 있습니다. 소비자를 시작하면 새로운 메시지가 모두 발견됩니다. Java kafka 제작자를 사용하여 새로운 메시지를 찾습니다. 그러나 잠시 후 계속 반복되지만 새 메시지는 없습니다. 디버거에서 실행을 일시 중지하면 소비자가 갑자기 메시지를 찾기 시작합니다. Java에서 버전 0.8.0을 사용하고 있습니다. 오류가 발생하면 메시지를 소비하는 프로세스는 별도의 '오류'주제로 메시지를 생성합니다. 이러한 오류 메시지가 표시되지 않게되면이 문제가 발생하지 않습니다.소비자가 디버거에서 실행을 일시 중지 할 때까지 아무런 메시지도 찾지 못합니다.
1
A
답변
0
문제는 내가 보지 못했던 카프카 (kafka) 버그 인 것 같습니다. 동일한 ConsumerConnector를 사용하여 둘 이상의 ConsumerIterator를 만들면 (둘 이상의 주제가있는지도를 제공하여) 주제가 너무 자주 ConsumerIterators로 전환됩니다. 디버거에서 일시 중지하여 consumerIterator를 보려고하면 다시 전환됩니다. 우리가 무엇이 잘못되었는지 파악하기 위해
/**
* @param zookeeperAddresses (includes the port number)
* @param topics all topics to be consumed.
* @return A list of ConsumerIterators.
*/
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
String groupId = "client_" + topics.get(0);
LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
List<ConsumerIterator> topicConsumers = new LinkedList<>();
for (String topic : topics) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(zookeeperAddresses, groupId));
consumers.add(consumer);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, Integer.valueOf(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
assert(streams.size() == 1);
ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
topicConsumers.add(consumerIterator);
}
return topicConsumers;
}
당신의 코드를 게시하시기 바랍니다 :
여기는이 버그를 해결 작동 고정 코드입니다 :
는 여기에 버그가 그 ConsumerIterators를 만들기위한 내 예전의 코드입니다 – serejja