여러 개의 다른 주제를들을 필요가있는 애플리케이션이 있습니다. 각 주제에는 메시지 처리 방법에 대한 별도의 논리가 있습니다. 각 KafkaStreams 인스턴스에 대해 동일한 카프카 속성을 사용하려고 생각했지만 아래 오류가 발생합니다.Kafka Streams : 동일한 'application.id'를 사용하여 여러 주제에서 사용하십시오.
오류
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
코드 (코 틀린)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
내가 발견 당신이 그러나 나는 하드를 발견하고, 여러 주제 application.id
을 사용할 수 없습니다 제안이 reference 이를 지원하는 참조 문서를 찾으십시오. documentationapplication.id
대한 상태 :
스트림 처리 애플리케이션에 대한 식별자. Kafka 클러스터 내에서 고유해야합니다. 이것은 1) 기본 클라이언트 ID 접두어, 2) 멤버쉽 관리를위한 그룹 ID, 3) 변경 로그 주제 접두사로 사용됩니다.
질문
- 이 오류는 무엇을 의미 하는가, 그리고 무엇을 발생합니다.
- 여러 개의 토픽 파티션에서 소비하는 동일한 ID로 실행중인 응용 프로그램 인스턴스를 여러 개 가질 수 있다면 은 "Kafka 클러스터 내에서 고유해야 함"을 의미합니까?
- 동일한 카프카 스트림
application.id
을 사용하여 다른 주제로 나열되는KafkaStreams
두 개를 시작할 수 있습니까? 그렇다면 어떻게?
세부 사항 : 카프카 0.11.0.2
감사합니다. "동일한 주제의 여러 응용 프로그램입니다 .ID는 입력 항목과 동일해야합니다"_라는 문서가 있습니까? –
잘 모르겠습니다. 참고로, AK는 v1 용 스트림 용 문서입니다.0 재 작업을 많이 받음 - 메일 링리스트를 통해 의견이나 제안을 환영합니다. –