2017-12-18 8 views
1

1.0.0 Kafka 관리 클라이언트를 사용하여 프로그래밍 방식으로 브로커에 주제를 작성하고자합니다. Scala를 사용하고 있습니다. 나는 카프카 브로커에 대한 주제를 작성하는 중 다음 코드를 사용하려고했습니다 또는 간단하게 사용할 수있는 주제를하거나 명령을Kafka 1.0.0 관리 클라이언트가 EOFException을 사용하여 주제를 만들 수 없음

import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic} 
import scala.collection.JavaConverters._ 

val zkServer = "localhost:2181" 
val topic = "test1" 

val zookeeperConnect = zkServer 
val sessionTimeoutMs = 10 * 1000 
val connectionTimeoutMs = 8 * 1000 


val partitions = 1 
val replication:Short = 1 
val topicConfig = new Properties() // add per-topic configurations settings here 

import org.apache.kafka.clients.admin.AdminClientConfig 
val config = new Properties 
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer) 
val admin = AdminClient.create(config) 

val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true)) 
val nms = existing.names() 
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails 

val newTopic = new NewTopic(topic, partitions, replication) 
newTopic.configs(Map[String,String]().asJava) 
val ret = admin.createTopics(List(newTopic).asJavaCollection) 
ret.all().get() // Also fails 
admin.close() 

을 나열, 사육사는 (3.4.10) 측은 예외 : EOFException를 throw하고 연결을 닫습니다 . ZooKeeper 자체를 디버깅 할 때 관리자 클라이언트가 보내는 메시지를 deserialize 할 수없는 것 같습니다 (읽으려는 바이트가 부족합니다)

1.0.0 Kafka 관리 클라이언트를 사용할 수있는 사람은 누구나 주제를 만들거나 나열 하시겠습니까?

답변

2

AdminClient는 Kafka에 직접 연결되므로 Zookeeper에 액세스 할 필요가 없습니다.

Zookeeper 대신 Kafka 중개인 (예 : localhost:9092)을 가리 키도록 AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG을 설정해야합니다.

+0

고맙습니다. 이것은 API가 여러분에게 연결을 요구하는 https://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java와 같은 0.9/0.10의 예제와 비교할 때 상당히 문제가됩니다. 카프카보다는 사육사에게 – glorat