2017-03-23 3 views
0

내가이 개 주제를 소모 전형적인 samza 작업을 분할 소모 할 수 있습니다. 이 두 주제의 각각 하나의 파티션 만있는 경우은 어떻게 samza 작업은 하나 이상의 카프카 스트림

이 작업은 잘 작동합니다. data을 10 개의 파티션으로 분할하고 config을 하나의 파티션으로 남겨두면 상황이 바뀌 었습니다.

task[0] -> config, data[0] task[1] -> data[1] ... task[9] -> data[9]

각 작업 자체 rocksdb 인스턴스 초기화 것 같다, 그래서 작업에만 [0 : 기본적으로 samza 파티션 0 ~ 9 data 주제 만 작업 0의 소비 열 작업을 생성하는 것은 config 주제를 소모 ]는 모든 설정 데이터를 rocksdb 인스턴스에 저장하고, task [1 ~ 9]에는 설정 데이터가 없으므로 들어오는 데이터의 설정 정보를 찾을 수 없습니다. 이를 달성하기 위해 어떤 방법

task[0] -> config, data[0] task[1] -> config, data[1] ... task[9] -> config, data[9]

있습니까 : 내가 기대했던

각 작업은 데이터 파티션이 같은 설정 스트림에서 메시지를 소비입니까?

답변

4

입력 스트림 (들)의 파티션은 "job.systemstreampartition.grouper.factor"을 사용하여 구성되는 플러그 그루퍼에 의해 지배된다 분포. 기본적으로이 클래스는 작업 인스턴스간에 들어오는 스트림 파티션을 그룹화합니다. 기본적으로 GroupByPartitionId를 수행한다고 생각합니다. 그래서 당신은 작업 [0]에서 데이터 [0]과 config [0]을보고 있습니다.

는 사용자 지정 SSPGrouper을 구현할 수 있습니다. 그러나, 당신이 찾고있는 것은 "방송"입력 스트림으로 일반 입력 스트림을 "설정"스트림으로 스트림을 "데이터"를 치료하는 것입니다. 브로드 캐스트는 Samza 작업의 모든 작업이이 스트림의 파티션에서 읽음을 의미합니다. 이렇게하면 각 작업 인스턴스는 설정 스트림의 데이터로 로컬 rocksdb를 채울 수 있습니다. 당신은 같은 방송 스트림을 구성 할 수 있습니다 귀하의 경우를 들어 task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]

을, 당신은 구성 할 수 있습니다 task.inputs = <systemName>.data task.broadcast.inputs = <systemName>.config#0

체크 아웃 Broadcast Streams in Samza

+0

고마워, @NavinaRamesh을. 그것은 나를 위해 작동합니다. 스레드에서'예외 "주요"java.lang.IllegalArgumentException가 : kafka.config 번호 [0]에 잘못된 형식 – Aries

+0

은 @NavinaRamesh의 제안에 따라, 나는 예외가 발생했습니다. 방송 스트림 이름 'system.stream # PartitionID에'올바른 형식이어야 수단 또는 'system.stream 번호 [partitionN-partitionM]' ','task.broadcast.inputs = 의 .config # 0 '형태이어야 올바른 형식으로 변경하면 모든 것이 매력처럼 작동합니다. 고마워하고 환호, @ NavinaRamesh :) – Aries

+0

다행 당신은 그것을 알아낼 수 있었다 :) 나는 내 응답에 구성 형식을 수정했습니다. 감사! –