들어오는 메시지를 여러 값으로 그룹화하는 Kafka Streams 앱이 있습니다. 예를 들면 :Kafka Streams - 여러 집계로 생성 된 내부 주제 수를 줄일 수 있습니까?
예 메시지 :
{ "gender": "female", "location": "canada", "age-group": "25-30" }
토폴로지 : 주제를 많이에서
table
.groupBy((key, value) -> groupByGender) // example key: female
.count("gender-counts");
table
.groupBy((key, value) -> groupByLocation) // example key: canada
.count("location-counts");
table
.groupBy((key, value) -> groupByAgeGroup) // example key: 25-30
.count("age-group-counts");
이 결과 :
my-consumer-gender-counts-changelog
my-consumer-gender-counts-repartition
my-consumer-location-counts-changelog
my-consumer-location-counts-repartition
my-consumer-age-group-counts-changelog
my-consumer-age-group-counts-repartition
우리가 여러 집계를 보낼 수 있다면 좋을 텐데 단일 상태 저장소에 저장하고 값의 그룹을 키의 일부로 포함시킵니다. 예를 들어 :
table
.groupBy((key, value) -> groupByGender) // example key: female_gender
.count("counts");
table
.groupBy((key, value) -> groupByLocation) // example key: canada_location
.count("counts");
table
.groupBy((key, value) -> groupByAgeGroup) // example key: 25-30_age_group
.count("counts");
이 훨씬 적은 주제 초래 : 현재
counts-changelog
counts-repartition
이를, (어쨌든 DSL을 사용) 할 수 표시되지 않는 groupBy
연산자를 사용하여 내부 항목을 작성하기 다시 분할하기 때문에 groupBy
가지가 다른 여러 하위 토폴로지가있는 경우 Kafka Streams는 여러 소스에서 동일한 다시 분할 주제를 등록하려고 시도합니다. 이것은 다음과 같은 오류가 발생합니다
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic counts-repartition has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.validateTopicNotAlreadyRegistered(TopologyBuilder.java:518)
groupBy
경우 더 이상의 레코드를 반환 할 수있다 (예를 들어 같은 flatMap
는 않습니다), '우리는 기록의 모음 (각 그룹에 대한 하나의 레코드)를 반환 할 수 있지만,이 역시 아무튼 DSL을 사용하여 가능할 것 같습니다.
여러 질문 (여러 개의 그룹에 대해 2 개)을 작성해야하는 경우 여러 개의 값 (예 : { "gender": "female", "location": "canada", "age-group": "25-30" }
)으로 그룹화 할 수있는 단일 레코드가 주어진다면 (예 : 100 개의 그룹화가 있었던 경우) ? 하나의 레코드가 여러 값으로 그룹화 될 수있을 때 더 적합한 다른 전략이 있습니까? 내가 (고유 한 키의 수가 매우 적은 경우에도) 단일 아이디어에 여러 집계를 함부로 제안하는 것은 나쁜 생각입니까?