Kafka 스 트리머로 Confluent-3.2.1을 사용하고 있습니다. 내 KGroupedStream<String, MyClass1>
을 KTable<Windowed<String>,MsgAggr>
에 집계하려고합니다. 집계를 사용하는 동안 나는 TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
도 사용하고 있습니다. 나는 사용자 정의 "Serdes"를 집계에 대한 인수로 사용하고 있습니다.Kafka Streamer : 사용자 정의 'Serdes'관련 문제
[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
에서 동일한 것을 사용하십시오. 다른 질문은 이벤트 시간대 처리에 대한 것입니다. 집합 내에서 TimeWindow를 사용하고 있습니다. 또한'streamsConfiguration.put (StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); '을 사용하여 스트림에서 타임 스탬프를 추출합니다.'메시지 집계를 제어하기 위해 메시지의 일부인 시간을 사용하고 싶습니다. 이것을 어떻게 할 수 있습니까? – kadsank
'MyEventTimeExtractor' 구현체는'ConsumerRecord # timestamp' 메소드를 사용하여 이벤트 타임 스탬프를 얻을 수 있습니다 – Abhishek
카프카의 레코드 임베드 타임 스탬프를 사용하려면 미리 정의 된 추출기가 3 가지 있습니다 : http : // docs .confluent.io/current/streams/developer-guide.html # streams-developer-guide-timestamp-extractor 따라서이 경우 자신 만의 추출기를 구현해야하는지 잘 모르겠습니다. –