0

40 개의 파티션이있는 주제가 있습니다. 설정은 다음과 같습니다.Confluent Kafka : 소비자가 주제의 모든 파티션을 처음부터 읽지 않습니다.

def on_assign (c,ps): 
for p in ps: 
    p.offset=0 
print ps 
c.assign(ps) 

conf = {'bootstrap.servers': 'localhost:9092' 
     'enable.auto.commit' : False, 
     'group.id' : 'confluent_consumer', 
     'default.topic.config': {'auto.offset.reset': 'earliest'} 
     } 
consumer = Consumer(**conf) 
consumer.subscribe(['topic.source'], on_assign=on_assign) 

msg = consumer.poll(timeout=100000) 
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key()) 

topic.source의 모든 파티션에 대해 오프셋 0부터 읽으 려합니다. 하지만 모든 파티션에서 그런 일이 일어나지는 않습니다. 일부 파티션의 경우, 커밋 된 오프셋이라고 가정하고있는 특정 오프셋에서 읽을 때마다 group.id을 변경할 때마다 도움이되지 않습니다. 커밋 된 오프셋과 상관없이이 주제의 모든 파티션을 처음부터 어떻게 읽을 수 있습니까?

나는 on_assign()ps를 인쇄하고는 모두 40 개 파티션이 같은 인쇄 : 당신이 새 값으로 group.id을 설정하거나 어떤 auto.offset.reset 세트에 오프셋 노력하지 않은 그룹을 사용하여 사용하는 경우

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on 

답변

1

earliest 그러면 소비자는 파티션의 시작 부분부터 시작합니다.

즉, 시작 부분은 0이되지 않을 수 있습니다. 브로커의 로그 보존 설정에 따라 Kafka는 메시지를 삭제할 수 있으므로 파티션에서 사용 가능한 첫 번째 메시지는 어떤 오프셋이 될 수 있습니다.

+0

안녕하세요, Mickael, 답변 해 주셔서 감사합니다. 토픽 파티션의 시작 오프셋이 무엇인지 알 수있는 명령/도구가 있습니까 (이전 메시지가 보존 정책으로 인해 삭제되는 경우)? – NoName

+0

브로커를 확인하려면 kafka의'log.dirs'에 들어가서 파티션의 디렉토리를 찾으십시오. 내부에는'.log' 파일이 있어야합니다. 파일의 이름은 첫 번째 오프셋을 나타내야합니다. 예를 들어, '00000000000000000216.log'가 있으면 216이 첫 번째 오프셋입니다. 설정에 따라 몇 가지 로그 파일이있을 수 있습니다. 가장 작은 이름을 사용하십시오. –

+0

안녕하세요, Mickael! 고마워, 내가 log.dirs를 발견하고 * .log 파일을 볼 수있다. – NoName