2017-10-19 1 views
0

Producer.send 콜백은 메시지 객체를 제공합니다. message.offset()은 종종 버그 인 것처럼 0을 반환합니다.confluent-python kafka 제작자가 콜백 보내기 message.offset()은 0을 반환합니다.

confluent-kafka 파이썬 라이브러리 버전 0.11.0 librdkafka : stable 0.11.0 (bottled), HEAD. 맥 OS 브루

다음의 간단한 테스트 프로그램을 통해 설치된 :

import confluent_kafka 
import timeit 


def delivery_callback(error, message): 
    print("delivery_callback. error={}. message={}".format(error, message)) 
    print("message.topic={}".format(message.topic())) 
    print("message.timestamp={}".format(message.timestamp())) 
    print("message.key={}".format(message.key())) 
    print("message.value={}".format(message.value())) 
    print("message.partition={}".format(message.partition())) 
    print("message.offset={}".format(message.offset())) 


def produce_string_messages(kafka_producer, topic_name, num_messages): 
    start_time = timeit.default_timer() 

    for i in range(num_messages): 
     kafka_producer.produce(topic_name, value="cf-k test. v{}".format(i), on_delivery=delivery_callback) 

    elapsed = timeit.default_timer() - start_time 
    print("completed producing messages. They are queued for delivery. elapsed={}. elapsed/msg={}".format(elapsed, elapsed/num_messages)) 


if __name__ == "__main__": 
    print("starting") 

    conf = { 
     'bootstrap.servers': "kafka-broker-1:9092" 
    } 

    kafka_producer = confluent_kafka.Producer(conf) 

    print("opened KafkaProducer") 
    produce_string_messages(kafka_producer, "my-string-topic", 3) 

    print("flushing...") 
    kafka_producer.flush() 

    print("exiting") 

는 생산 : message.offset() 처음 두 메시지 및 제 0이 아닌 용 제로

starting 
opened KafkaProducer 
completed producing messages. They are queued for delivery. elapsed=0.000994920730591. elapsed/msg=0.00033164024353 
flushing... 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v0 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v1 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v2 
message.partition=0 
message.offset=24 
exiting 

공지있다. 이 테스트 프로그램을 다시 실행하여 세 개의 메시지를 보내는 경우 세 번째 message.offset은 3 씩 증가합니다. message.offset()이 종종 0을 잘못 반환하는 버그처럼 보입니다.

답변

0

배달 보고서는 올바른 오프셋 생성 된 배치의 마지막 메시지 이것은과 같이 true로 produce.offset.report 주제 레벨 구성 속성을 설정하여 일괄 처리에있는 모든 메시지에 대한 적절한 오프셋을 제공하기 위해 변경할 수 있습니다

p = confluent_kafka.Producer({'bootstrap.servers': ..., 
           'default.topic.config': { 'produce.offset.report': True } }) 

우리의 다음 릴리스에서 사실로 기본값을 변경할 수 있습니다 파이썬 클라이언트.

[1] : 일괄 처리에서 메시지의 선형 스캔은 피할 수 있지만 성능에 미치는 영향은 Python 랜드에서는별로 중요하지 않으므로 걱정할 필요가 없습니다.

+0

Perfect. 감사! 주 저자로서이 기사에 응답 할 수 있습니까? https://stackoverflow.com/questions/44732214/apt-get-install-librdkafka1-fails-on-debian-9-x-due-to-libssl- dependency – clay