2017-05-23 6 views
0

Kafka 스 트리머로 Confluent-3.2.1을 사용하고 있습니다. 내 KGroupedStream<String, MyClass1>KTable<Windowed<String>,MsgAggr>에 집계하려고합니다. 집계를 사용하는 동안 나는 TimeWindows.of(TimeUnit.SECONDS.toMillis(5))도 사용하고 있습니다. 나는 사용자 정의 "Serdes"를 집계에 대한 인수로 사용하고 있습니다.Kafka Streamer : 사용자 정의 'Serdes'관련 문제

[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249) 
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String 
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String 
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

답변

3

문제는 message.groupByKey();을 함께 : 사용자의 코드는 내가 오류를받은 코드를 실행하면

Map<String, Object> serdeProps = new HashMap<>(); 

final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>(); 
serdeProps.put("JsonPOJOClass", MsgAggr.class); 
pageViewSerializer.configure(serdeProps, false); 

final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>(); 
serdeProps.put("JsonPOJOClass", MsgAggr.class); 
pageViewDeserializer.configure(serdeProps, false); 

final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);` 

스트리밍을위한 코드

KGroupedStream<String, MyClass1> msg_grp = message 
      .groupByKey(); 
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp 
      //.reduce(new Reduced(), arg1, arg2); 
      .aggregate(new Init(), 
        new Aggr(), 
        TimeWindows.of(TimeUnit.SECONDS.toMillis(5)), 
        pageViewSerde, 
        "MySample_out"); 

이다,이다 "의 Serdes"를 정의 . 귀하의 사용자 정의 클래스에 대한 문자열 Serde 사용 MyClass1. MyClass1에 대한 사용자 지정 serializer 및 deserializer를 구현하고 오버로드 된 버전 groupByKey - https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde)

+0

에서 동일한 것을 사용하십시오. 다른 질문은 이벤트 시간대 처리에 대한 것입니다. 집합 내에서 TimeWindow를 사용하고 있습니다. 또한'streamsConfiguration.put (StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); '을 사용하여 스트림에서 타임 스탬프를 추출합니다.'메시지 집계를 제어하기 위해 메시지의 일부인 시간을 사용하고 싶습니다. 이것을 어떻게 할 수 있습니까? – kadsank

+0

'MyEventTimeExtractor' 구현체는'ConsumerRecord # timestamp' 메소드를 사용하여 이벤트 타임 스탬프를 얻을 수 있습니다 – Abhishek

+0

카프카의 레코드 임베드 타임 스탬프를 사용하려면 미리 정의 된 추출기가 3 가지 있습니다 : http : // docs .confluent.io/current/streams/developer-guide.html # streams-developer-guide-timestamp-extractor 따라서이 경우 자신 만의 추출기를 구현해야하는지 잘 모르겠습니다. –