1.0.0 Kafka 관리 클라이언트를 사용하여 프로그래밍 방식으로 브로커에 주제를 작성하고자합니다. Scala를 사용하고 있습니다. 나는 카프카 브로커에 대한 주제를 작성하는 중 다음 코드를 사용하려고했습니다 또는 간단하게 사용할 수있는 주제를하거나 명령을Kafka 1.0.0 관리 클라이언트가 EOFException을 사용하여 주제를 만들 수 없음
import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic}
import scala.collection.JavaConverters._
val zkServer = "localhost:2181"
val topic = "test1"
val zookeeperConnect = zkServer
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000
val partitions = 1
val replication:Short = 1
val topicConfig = new Properties() // add per-topic configurations settings here
import org.apache.kafka.clients.admin.AdminClientConfig
val config = new Properties
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer)
val admin = AdminClient.create(config)
val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true))
val nms = existing.names()
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails
val newTopic = new NewTopic(topic, partitions, replication)
newTopic.configs(Map[String,String]().asJava)
val ret = admin.createTopics(List(newTopic).asJavaCollection)
ret.all().get() // Also fails
admin.close()
을 나열, 사육사는 (3.4.10) 측은 예외 : EOFException를 throw하고 연결을 닫습니다 . ZooKeeper 자체를 디버깅 할 때 관리자 클라이언트가 보내는 메시지를 deserialize 할 수없는 것 같습니다 (읽으려는 바이트가 부족합니다)
1.0.0 Kafka 관리 클라이언트를 사용할 수있는 사람은 누구나 주제를 만들거나 나열 하시겠습니까?
고맙습니다. 이것은 API가 여러분에게 연결을 요구하는 https://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java와 같은 0.9/0.10의 예제와 비교할 때 상당히 문제가됩니다. 카프카보다는 사육사에게 – glorat