2017-03-22 2 views
0

기존 주제의 데이터를 다른 주제로 스트리밍하는 간단한 k 스트림 응용 프로그램을 작성 중입니다.스레드 "StreamThread-1"에서 Kafka-Stream 예외 java.lang.IllegalArgumentException : 유효하지 않은 타임 스탬프 -1

: 내가 가진 응용 프로그램을 실행할 때, 그러나

package com.mycompany.app; 

import org.apache.kafka.common.serialization.Serdes; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import java.util.Properties; 


public class App { 

    public static void main(String[] args) throws Exception { 

     Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application"); 
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10); 

     KStreamBuilder builder = new KStreamBuilder(); 

     builder.stream("test").to("testout");; 

     KafkaStreams streams = new KafkaStreams(builder, props); 
     streams.start(); 

     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
      @Override 
      public void run() { 
       streams.close(); 
      } 
     })); 
    } 
} 

:

내 생산자가 test라는 주제 streaming meetup's open data 내가이 testout 주제 여기

로 처리 할 코드의

java -cp target/my-app-1.0-SNAPSHOT.jar com.mycompany.app.App 

이 예외가 발생합니다 :

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1 
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) 
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) 

나는 오랫동안 찾고 있었고, 왜 나는이 오류가 발생하는지 찾을 수 없다.

아이디어가 있으십니까? 에서

답변

0

: http://docs.confluent.io/3.1.0/streams/faq.html#invalid-timestamp-exception

이 오류는 카프카 스트림 응용 프로그램의 타임 스탬프 추출기는 기록에서 유효한 타임 스탬프를 추출하는 데 실패했음을 의미한다. 일반적으로 이것은 레코드 문제 (예 : 레코드에 타임 스탬프가 전혀 없음)를 나타내지 만 응용 프로그램에서 사용하는 타임 스탬프 추출기의 문제 또는 버그를 나타낼 수도 있습니다.

레코드가 유효한 타임 스탬프가 포함되어 있지 않습니다

: 그것은 당신의 기록이 포함 된 타임 스탬프를 수행하지 않는 것이 대부분 기본 ConsumerRecordTimestampExtractor를 사용하는 경우

  • 이다 (임베디드 기록 타임 스탬프는 카프카의 메시지에 도입되었다 Kafka 0.10 형식). 예를 들어 이전 Kafka 제작자 클라이언트 (예 : 버전 0.9 또는 이전 버전) 또는 제 3 자 제작자 클라이언트가 작성한 주제를 사용하는 경우 이러한 상황이 발생할 수 있습니다. 이 상황이 발생할 수있는 또 다른 상황은 Kafka 클러스터를 0.9에서 0.10으로 업그레이드 한 후 0.9로 생성 된 모든 데이터에 0.10 메시지 타임 스탬프가 포함되지 않은 경우입니다.
  • 사용자 정의 타임 스탬프 추출기를 사용하는 경우 추출기가 올바르지 않은 (부정적) 타임 스탬프를 올바르게 처리하고 있는지 확인하십시오. 여기서 "올바르게"는 응용 프로그램의 의미에 따라 다릅니다. 예를 들어 유효한 타임 스탬프를 추출 할 수없는 경우 기본 타임 스탬프 또는 예상 타임 스탬프를 반환 할 수 있습니다. 데이터의 타임 스탬프 필드가 누락 된 것일 수 있습니다.
  • WallclockTimestampExtractor를 통해 처리 시간 의미로 전환 할 수도 있습니다. 그러한 대체가 이러한 상황에 대한 적절한 대응인지 여부는 사용 사례에 따라 다릅니다. 그러나 먼저 문제가되는 기록이 왜 카프카에게 쓰여 졌는지에 대한 근본 원인을 확인하고 수정해야합니다. 두 번째 단계에서는 이러한 레코드를 처리 할 때 (예를 들어 이러한 레코드를 처리해야하는 경우) 임시 해결책 (위에 설명 된대로)을 적용하는 것이 좋습니다. 또 다른 옵션은 정확한 타임 스탬프로 레코드를 재생성하고 새로운 카프카 (Kafka) 주제에 기록하는 것입니다.