1

ConsumerSeekAware를 사용하여 카프카 주제에서 사용 가능한 마지막 메시지를 읽으려고합니다. 메시지 유형은 Avro 객체 목록입니다. 나는 그것을 성공적으로 할 수있다. 그러나 deserialization 중 실패 할 때. 메시지는 spring-cloud-stream-kafka 프레임 워크를 사용하여 생성되었습니다. 메시지의 콘텐츠 유형은 contentType=application/x-java-object;type=java.util.ArrayList입니다.spring-cloud-stream-kafka-binder가 생성 한 spring-kafka를 사용하는 소비자 메시지

저는 avro 메시지가 아래와 같이 비 직렬화 될 수 있음을 알고 있습니다.

DatumReader<GenericRecord> datumReader = 
      new SpecificDatumReader<>(targetType.newInstance().getSchema()); 
     Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); 

     result = (T) datumReader.read(null, decoder); 

하지만 작동하지 않습니다. 그것은 두 가지 때문일 수 있습니다.

  1. 메시지는 avro 개체 목록입니다. 하지만 Avro 스키마를 사용하여 datamReader를 만들려고합니다. 하지만 Schema.createArray (UserDTO.class)와 같은 스키마를 만들려고했습니다. 하지만 그것은 작동하지 않습니다.

  2. 나는 브로 메시지에 대해 예상되는 콘텐츠 형식이 응용 프로그램/브로하지만 메시지가 SCS에 의해 생산 될 때 내가 org.apache.kafka.common.serialization.Deserializer을 구현하여 디시리얼라이저를 작성하고 KafkaConsumerFactory를 구성하려고 contentType=application/x-java-object;

생각 . 누군가 도울 수 있습니까?

답변

0

Producer Properties에 표시된대로 ...producer.useNativeEncoding을 참조하십시오. 참으로 설정되면

을 useNativeEncoding

는 아웃 바운드 메시지 (예를 들어, 적절한 카프카 제조자 값 시리얼 설정) 대응 구성해야 클라이언트 라이브러리하여 ​​직렬화된다. 이 구성을 사용하면 아웃 바운드 메시지 마샬링은 바인딩의 contentType을 기반으로하지 않습니다. 네이티브 인코딩을 사용하는 경우 적절한 디코더 (예 : Kafka 소비자 가치 비 직렬화 기)를 사용하여 인바운드 메시지를 비 직렬화하는 것은 소비자의 책임입니다. 또한 기본 인코딩/디코딩을 사용하면 headerMode 속성이 무시되고 헤더가 메시지에 포함되지 않습니다.

기본값 : false.