2017-12-20 22 views
0

저는 Kafka (버전 0.11.0.2) 서버 API를 사용하여 로컬 호스트에서 kafka 브로커를 시작합니다. 어떤 문제없이 실행됩니다. 또한, 제작자는 메시지 성공을 보낼 수 있습니다. 그러나 소비자는 메시지를받을 수 없으며 콘솔에 오류 로그가 없습니다. 코드를 디버깅 한 후 "refreshing metadata"에 대해 반복합니다. 여기 log.debug (그룹 {}에 대해 코디네이터를 검색하지 못했습니다. 메타 데이터를 새로 고침 ", groupId) kafka 0.11.0.x로

소스 코드를

while (coordinatorUnknown()) { 
     RequestFuture<Void> future = lookupCoordinator(); 
     client.poll(future, remainingMs); 

     if (future.failed()) { 
      if (future.isRetriable()) { 
       remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); 
       if (remainingMs <= 0) 
        break; 

       log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId); 
       client.awaitMetadataUpdate(remainingMs); 
      } else 
       throw future.exception(); 
     } else if (coordinator != null && client.connectionFailed(coordinator)) { 
      // we found the coordinator, but the connection has failed, so mark 
      // it dead and backoff before retrying discovery 
      coordinatorDead(); 
      time.sleep(retryBackoffMs); 
     } 

     remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); 
     if (remainingMs <= 0) 
      break; 
    } 

Adddtion이다 : 나는 0.10.x에 카프카 버전을 변경, 그것의 실행은 OK.

여기 내 카프카 서버 코드입니다.

private static void startKafkaLocal() throws Exception { 
    final File kafkaTmpLogsDir = File.createTempFile("zk_kafka", "2"); 
    if (kafkaTmpLogsDir.delete() && kafkaTmpLogsDir.mkdir()) { 
     Properties props = new Properties(); 
     props.setProperty("host.name", KafkaProperties.HOSTNAME); 
     props.setProperty("port", String.valueOf(KafkaProperties.KAFKA_SERVER_PORT)); 
     props.setProperty("broker.id", String.valueOf(KafkaProperties.BROKER_ID)); 
     props.setProperty("zookeeper.connect", KafkaProperties.ZOOKEEPER_CONNECT); 
     props.setProperty("log.dirs", kafkaTmpLogsDir.getAbsolutePath()); 
     //advertised.listeners=PLAINTEXT://xxx.xx.xx.xx:por 
    // flush every message. 

     // flush every 1ms 
     props.setProperty("log.default.flush.scheduler.interval.ms", "1"); 
     props.setProperty("log.flush.interval", "1"); 
     props.setProperty("log.flush.interval.messages", "1"); 
     props.setProperty("replica.socket.timeout.ms", "1500"); 
     props.setProperty("auto.create.topics.enable", "true"); 
     props.setProperty("num.partitions", "1"); 

     KafkaConfig kafkaConfig = new KafkaConfig(props); 

     KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig); 
     kafka.startup(); 
     System.out.println("start kafka ok "+kafka.serverConfig().numPartitions()); 
    } 
} 

감사합니다. 카프카 0.11와

답변

1

, 당신은 또한 다음과 같은 3 설정을 설정해야 num.partitions 1로 설정 한 경우 :

0.11을 실행할 때 서버 로그에서 명백해야한다
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 

.

+0

고마워요. 로컬 Kafka 서버를 설정하여 서버 로그를 얻는 방법은 무엇입니까? –

+0

'System.setProperty ("log4j.configuration", 새로운 파일 (.....));' –

+0

정말 감사합니다. –