ConsumerSeekAware를 사용하여 카프카 주제에서 사용 가능한 마지막 메시지를 읽으려고합니다. 메시지 유형은 Avro 객체 목록입니다. 나는 그것을 성공적으로 할 수있다. 그러나 deserialization 중 실패 할 때. 메시지는 spring-cloud-stream-kafka 프레임 워크를 사용하여 생성되었습니다. 메시지의 콘텐츠 유형은 contentType=application/x-java-object;type=java.util.ArrayList
입니다.spring-cloud-stream-kafka-binder가 생성 한 spring-kafka를 사용하는 소비자 메시지
저는 avro 메시지가 아래와 같이 비 직렬화 될 수 있음을 알고 있습니다.
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
하지만 작동하지 않습니다. 그것은 두 가지 때문일 수 있습니다.
메시지는 avro 개체 목록입니다. 하지만 Avro 스키마를 사용하여 datamReader를 만들려고합니다. 하지만 Schema.createArray (UserDTO.class)와 같은 스키마를 만들려고했습니다. 하지만 그것은 작동하지 않습니다.
나는 브로 메시지에 대해 예상되는 콘텐츠 형식이 응용 프로그램/브로하지만 메시지가 SCS에 의해 생산 될 때 내가
org.apache.kafka.common.serialization.Deserializer
을 구현하여 디시리얼라이저를 작성하고KafkaConsumerFactory
를 구성하려고contentType=application/x-java-object;
생각 . 누군가 도울 수 있습니까?