2017-09-12 7 views
0

나는 KafkaConsumer이며 수동 파티션 할당을 사용합니다. 파티션을 배포하기 위해 추가 된 파티션을 감지하기 위해 정기적 인 간격으로 consumer.partitionsFor(topicId)을 사용합니다. 작업이 영원히 계속 실행되기 때문에이 케이스를 지원하고 싶습니다. 그러나 이것은 소비자를 다시 시작하지 않는 한 항상 파티션의 초기 목록을 반환합니다.아파치 카프카에 추가 된 파티션을 사용하는 중

소비자가 파티션을 추가하는 방법이 있습니까? 설문 조사 또는 청취 할 대상은 무엇입니까?

+0

또한 자동 할당을 테스트했으며 추가 된 파티션도 감지하지 못하는 것으로 보입니다. – Oliv

+0

자동 할당에 대한 코드를 보여주십시오. 그렇습니다. 수동으로는이 변경 사항을 감지 할 수 없지만,'subscribe'가 처리 할 수 ​​있다고 생각합니다. – GuangshengZuo

+0

그냥'consumer.subscribe (singletonList ("my_topic"))'입니다. 그런 다음'consumer.assignment()'의 결과를 확인하고 파티션을 추가 한 후에도 변경되지 않습니다. – Oliv

답변

0

KafkaConsumer에는 configuration property "metadata.max.age.ms"가 있으며, 기본값은 5 분입니다. 즉, consumer.partitionsFor을 호출 할 때마다 해당 시간 동안 캐시 된 복사본이 반환되고 새 메타 데이터 만 가져옵니다.

속성을 0으로 설정하면 매번 새로운 메타 데이터를 가져옵니다.

+0

kafka 문제보기 https://issues.apache.org/jira/browse/KAFKA-5881 – Oliv

-1

아마도 assignment() 메서드로 쿼리 할 때 파티션이 재 할당되는 중일 수 있습니다. 5 초 동안 잠을 잘 수 있었는지 확인한 다음 테스트 해보십시오.

현재이 소비자에게 할당 된 파티션 세트를 가져옵니다. assign (Collection)을 사용하여 파티션을 직접 할당하여 구독이 발생하면 할당 된 동일한 파티션을 반환합니다. 토픽 구독이 사용 된 경우 현재 소비자에게 할당 된 토픽 파티션 세트가 제공됩니다 (할당이 아직 수행되지 않았거나 파티션이 재 할당 중일 경우 아무 것도 표시되지 않음).

+0

적어도 30 초 기다렸습니다. – Oliv

+0

이 그룹의 유일한 소비자입니까? – GuangshengZuo

+0

두 개의 jvm에서 두 소비자가 있지만, 모두에 'assignment()'출력을 출력합니다 – Oliv