40 개의 파티션이있는 주제가 있습니다. 설정은 다음과 같습니다.Confluent Kafka : 소비자가 주제의 모든 파티션을 처음부터 읽지 않습니다.
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
topic.source
의 모든 파티션에 대해 오프셋 0부터 읽으 려합니다. 하지만 모든 파티션에서 그런 일이 일어나지는 않습니다. 일부 파티션의 경우, 커밋 된 오프셋이라고 가정하고있는 특정 오프셋에서 읽을 때마다 group.id
을 변경할 때마다 도움이되지 않습니다. 커밋 된 오프셋과 상관없이이 주제의 모든 파티션을 처음부터 어떻게 읽을 수 있습니까?
나는 on_assign()
에 ps
를 인쇄하고는 모두 40 개 파티션이 같은 인쇄 : 당신이 새 값으로 group.id
을 설정하거나 어떤 auto.offset.reset
세트에 오프셋 노력하지 않은 그룹을 사용하여 사용하는 경우
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
안녕하세요, Mickael, 답변 해 주셔서 감사합니다. 토픽 파티션의 시작 오프셋이 무엇인지 알 수있는 명령/도구가 있습니까 (이전 메시지가 보존 정책으로 인해 삭제되는 경우)? – NoName
브로커를 확인하려면 kafka의'log.dirs'에 들어가서 파티션의 디렉토리를 찾으십시오. 내부에는'.log' 파일이 있어야합니다. 파일의 이름은 첫 번째 오프셋을 나타내야합니다. 예를 들어, '00000000000000000216.log'가 있으면 216이 첫 번째 오프셋입니다. 설정에 따라 몇 가지 로그 파일이있을 수 있습니다. 가장 작은 이름을 사용하십시오. –
안녕하세요, Mickael! 고마워, 내가 log.dirs를 발견하고 * .log 파일을 볼 수있다. – NoName