2017-10-18 3 views
0

폭풍 버전 1.1.0 및 카프카 버전 0.10.1.2를 사용 중입니다.Kafka 스파우트 오류 "소비자가 어떤 주제에도 등록되지 않았거나 파티션이 할당되지 않았습니다."

다음과 같이 나는 카프카 - 주둥이를 만드는 오전 :

public KafkaSpout<String, String> getKafkaSpout() { 
    String _kafkaBrokers = (String) props.get("bootstrap.servers"); 
    String _topic = (String) props.get("kafka.topic.name"); 
    String groupId = (String) props.get("group.id"); 
    int maxMsgSize = (int) props.get("fetch.message.max.bytes"); 
    String keySerializer = (String) props.get("key.serializer"); 
    String valueSerializer = (String) props.get("value.serializer"); 

    List<String>topics = new ArrayList<String>(`enter code here`); 
    topics.add(_topic); 

    return new KafkaSpout<String, String (KafkaSpoutConfig.builder(_kafkaBrokers, topics) 
      .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) 
      .setMaxUncommittedOffsets(100) 
      .setProp(ConsumerConfig.GROUP_ID_CONFIG, groupId) 
      .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxMsgSize) 
      .setProp("key.serializer",keySerializer) 
      .setProp("value.serializer",valueSerializer) 
      .build()) 
} 

을 내가 다른 종속 나는 아래의 프로젝트에 받는다는 종속성을 언급과 함께 아래에 언급 된 오류

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:973) 
at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:291) 
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:225) 
at org.apache.storm.daemon.executor$fn__9798$fn__9813$fn__9844.invoke(executor.clj:647) 
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) 
at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 

을 얻고있다

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka-client</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 

답변

0

List<String>topics = new ArrayList<String>("enter code here");이 문제라고 생각합니까? 그 목록에 주제 이름을 써야 할 것입니다.

종속성 버전이 이상합니다. AFAIK Storm은 해당 버전 문자열에 아무 것도 출시하지 않았습니다.

나는 Kafka가 0.10 클러스터 이상인 storm-kafka-client와 구형 Kafka 클러스터를위한 storm-kafka가 모두 필요한 이유가 궁금합니다. (그러나 최신 카프카와는 여전히 호환됩니다. 생각한다).