0
저는 Kafka (버전 0.11.0.2) 서버 API를 사용하여 로컬 호스트에서 kafka 브로커를 시작합니다. 어떤 문제없이 실행됩니다. 또한, 제작자는 메시지 성공을 보낼 수 있습니다. 그러나 소비자는 메시지를받을 수 없으며 콘솔에 오류 로그가 없습니다. 코드를 디버깅 한 후 "refreshing metadata"에 대해 반복합니다. 여기 log.debug (그룹 {}에 대해 코디네이터를 검색하지 못했습니다. 메타 데이터를 새로 고침 ", groupId) kafka 0.11.0.x로
소스 코드를while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future, remainingMs);
if (future.failed()) {
if (future.isRetriable()) {
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId);
client.awaitMetadataUpdate(remainingMs);
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);
}
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
}
Adddtion이다 : 나는 0.10.x에 카프카 버전을 변경, 그것의 실행은 OK.
여기 내 카프카 서버 코드입니다.
private static void startKafkaLocal() throws Exception {
final File kafkaTmpLogsDir = File.createTempFile("zk_kafka", "2");
if (kafkaTmpLogsDir.delete() && kafkaTmpLogsDir.mkdir()) {
Properties props = new Properties();
props.setProperty("host.name", KafkaProperties.HOSTNAME);
props.setProperty("port", String.valueOf(KafkaProperties.KAFKA_SERVER_PORT));
props.setProperty("broker.id", String.valueOf(KafkaProperties.BROKER_ID));
props.setProperty("zookeeper.connect", KafkaProperties.ZOOKEEPER_CONNECT);
props.setProperty("log.dirs", kafkaTmpLogsDir.getAbsolutePath());
//advertised.listeners=PLAINTEXT://xxx.xx.xx.xx:por
// flush every message.
// flush every 1ms
props.setProperty("log.default.flush.scheduler.interval.ms", "1");
props.setProperty("log.flush.interval", "1");
props.setProperty("log.flush.interval.messages", "1");
props.setProperty("replica.socket.timeout.ms", "1500");
props.setProperty("auto.create.topics.enable", "true");
props.setProperty("num.partitions", "1");
KafkaConfig kafkaConfig = new KafkaConfig(props);
KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig);
kafka.startup();
System.out.println("start kafka ok "+kafka.serverConfig().numPartitions());
}
}
감사합니다. 카프카 0.11와
고마워요. 로컬 Kafka 서버를 설정하여 서버 로그를 얻는 방법은 무엇입니까? –
'System.setProperty ("log4j.configuration", 새로운 파일 (.....));' –
정말 감사합니다. –