2017-12-20 30 views
-1

프로세서 API를 사용하여 카프카 스트리밍 응용 프로그램을 만듭니다. 내가 들어오는 모든 메시지입력 레코드 타임 스탬프 및 출력 레코드 타임 스탬프는 원본 및 싱크 항목에서 동일합니까?

에 타임 스탬프를 첨부 할 항목을 만드는 방법은 다음과

이다 kafka-topics.sh --create --zookeeper 로컬 호스트 : 2181 --replication 요인 1 --partitions 1 - -topic topicName --config message.timestamp.type = CreateTime 더

워크 플로는 소스 항목에서 들어오는 메시지를 처리하고 주제를 가라 앉기를 게시한다. 몇 가지 이상한 이유 때문에 소스 및 싱크 주제 메시지에 동일한 타임 스탬프가 표시됩니다. 예를 들어, 메시지 작성 시간에 대한 소스 주제에서 싱크 주제와 동일하게 유지되는 T0입니다.

싱크 주제 메시지의 업데이트 된 타임 스탬프를 보려면 어떻게해야합니까?

+0

카프카 파이프 라인의 데이터는 변경 불가능합니다 – 0xtvarun

답변

0

CreateTime으로 주제를 구성하면 타임 스탬프 저장소가 제작자가 제공 한 타임 스탬프가됩니다.

보통 KafkaProducer의 경우 명시 적으로 시간 소인을 지정하지 않으면 KafkaProducerSystem.currentTimeMillis()을 사용하고 메시지를 브로커에 보냅니다.

카프카 스트림의 경우 특정 타임 스탬프가있는 입력 레코드를 읽는 경우 타임 스탬프 유추 논리를 사용하여 결과 레코드의 타임 스탬프를 계산합니다. 따라서 Kafka Streams는 내부적으로 사용 된 KafkaProducer에 타임 스탬프를 명시 적으로 설정하므로 제작자는이 타임 스탬프를 사용하고 현재의 벽시계를 사용하지 않습니다. 스트림 처리의 경우, 이것은 일반적으로 바람직한 동작입니다.

하나의 주제에서 다른 주제로 데이터를 복사하는 간단한 파이프 라인이있는 경우, 시간 소인 추정은 입력 레코드 시간 소인을 출력 레코드 시간 소인으로 사용합니다. 당신을 위해

  1. 구성 WallClockTimestampExtractor 카프카 스트림 응용 프로그램 :

    는 다른 의미를 취득 할 수있는 두 가지가 있습니다. 이 경우 Kafka Stream은 포함 된 레코드 타임 스탬프를 사용하지 않고 출력 레코드의 타임 스탬프를 가져 오는 현재 벽 시계 시간을 사용합니다.

  2. 출력 항목을 CreateTime 대신 AppendTime으로 구성하십시오. 이 경우 브로커는 항상 현재 브로커 벽 시계 시간과 함께 제작자가 제공 한 레코드 타임 스탬프를 덮어 씁니다.
+0

답장을 보내 주셔서 감사합니다 :-). 나는 두 가지 방법을 시도 할 것이다. – Prashanth