2017-12-27 37 views
0

여러 개의 다른 주제를들을 필요가있는 애플리케이션이 있습니다. 각 주제에는 메시지 처리 방법에 대한 별도의 논리가 있습니다. 각 KafkaStreams 인스턴스에 대해 동일한 카프카 속성을 사용하려고 생각했지만 아래 오류가 발생합니다.Kafka Streams : 동일한 'application.id'를 사용하여 여러 주제에서 사용하십시오.

오류

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic 

코드 (코 틀린)

class KafkaSetup() { 
    companion object { 
     private val LOG = LoggerFactory.getLogger(this::class.java) 
    } 

    fun getProperties(): Properties { 
     val properties = Properties() 
     properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app") 
     return properties 
    } 

    private fun listenOnMyTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic") 

     kStream.foreach { key, value -> LOG.info("do stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 

    private fun listenOnMyOtherTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic") 

     kStream.foreach { key, value -> LOG.info("do other stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 
} 

내가 발견 당신이 그러나 나는 하드를 발견하고, 여러 주제 application.id을 사용할 수 없습니다 제안이 reference 이를 지원하는 참조 문서를 찾으십시오. documentationapplication.id 대한 상태 :

스트림 처리 애플리케이션에 대한 식별자. Kafka 클러스터 내에서 고유해야합니다. 이것은 1) 기본 클라이언트 ID 접두어, 2) 멤버쉽 관리를위한 그룹 ID, 3) 변경 로그 주제 접두사로 사용됩니다.

질문

  1. 이 오류는 무엇을 의미 하는가, 그리고 무엇을 발생합니다.
  2. 여러 개의 토픽 파티션에서 소비하는 동일한 ID로 실행중인 응용 프로그램 인스턴스를 여러 개 가질 수 있다면 은 "Kafka 클러스터 내에서 고유해야 함"을 의미합니까?
  3. 동일한 카프카 스트림 application.id을 사용하여 다른 주제로 나열되는 KafkaStreams 두 개를 시작할 수 있습니까? 그렇다면 어떻게?

세부 사항 : 카프카 0.11.0.2

답변

2

카프카 스트림 파티션이 아닌 주제를 통해 확장 할 수 있습니다. 따라서 동일한 application.id으로 여러 응용 프로그램을 시작한 경우 구독하는 입력 항목과 해당 처리 논리와 관련하여 동일해야합니다. 응용 프로그램은 application.idgroup.id으로 사용하여 소비자 그룹을 구성하므로 입력 토픽의 다른 파티션이 다른 인스턴스에 할당됩니다.

당신이 같은 논리와 다른 항목이있는 경우, 당신은 (각 인스턴스에서 시작) 한 번에 모든 주제에 가입 할 수 있습니다. 확장은 여전히 ​​파티션을 기반으로합니다. (기본적으로 입력 주제의 "병합"입니다.)

주제를 통해 확장하거나 다른 처리 논리를 사용하려면 다른 Kafka Streams 응용 프로그램마다 다른 application.id을 사용해야합니다.

+0

감사합니다. "동일한 주제의 여러 응용 프로그램입니다 .ID는 입력 항목과 동일해야합니다"_라는 문서가 있습니까? –

+0

잘 모르겠습니다. 참고로, AK는 v1 용 스트림 용 문서입니다.0 재 작업을 많이 받음 - 메일 링리스트를 통해 의견이나 제안을 환영합니다. –