0
폭풍 버전 1.1.0 및 카프카 버전 0.10.1.2를 사용 중입니다.Kafka 스파우트 오류 "소비자가 어떤 주제에도 등록되지 않았거나 파티션이 할당되지 않았습니다."
다음과 같이 나는 카프카 - 주둥이를 만드는 오전 :
public KafkaSpout<String, String> getKafkaSpout() {
String _kafkaBrokers = (String) props.get("bootstrap.servers");
String _topic = (String) props.get("kafka.topic.name");
String groupId = (String) props.get("group.id");
int maxMsgSize = (int) props.get("fetch.message.max.bytes");
String keySerializer = (String) props.get("key.serializer");
String valueSerializer = (String) props.get("value.serializer");
List<String>topics = new ArrayList<String>(`enter code here`);
topics.add(_topic);
return new KafkaSpout<String, String (KafkaSpoutConfig.builder(_kafkaBrokers, topics)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setMaxUncommittedOffsets(100)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, groupId)
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxMsgSize)
.setProp("key.serializer",keySerializer)
.setProp("value.serializer",valueSerializer)
.build())
}
을 내가 다른 종속 나는 아래의 프로젝트에 받는다는 종속성을 언급과 함께 아래에 언급 된 오류
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:973)
at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:291)
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:225)
at org.apache.storm.daemon.executor$fn__9798$fn__9813$fn__9844.invoke(executor.clj:647)
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484)
at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
을 얻고있다
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.0.2.6.2.0-205</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.0.2.6.2.0-205</version>
</dependency>