나는 다음과 같은 시나리오 한 :봄 클라우드 스트림 카프카> 플루 REST 프록시에서 아 브로 메시지를 소모
- 프로듀서 브로로 (플루의 스키마 레지스트리에 스키마를 등록) 플루의 REST 프록시를 통해 카프카 주제에 메시지를 인코딩 보냅니다 http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
- 봄 클라우드 스트림 활성화 메시지에 설명 된 새 메시지 에 대한 주제에 수신은
내 응용 프로그램은 다음과 같습니다
@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
private static Logger log = LoggerFactory.getLogger(MyApplication.class);
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void myMessageSink(MyMessage message) {
log.info("Received new message: {}", message);
}
}
반면 MyMessage는 Avro 스키마에서 Avro에 의해 작성된 클래스입니다.
내 application.properties은 다음과 같습니다
spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro
내 문제는 새로운 메시지를 수신 할 때마다 다음과 같은 예외가 발생하는 것이 지금 : 내가 이해에서
org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
... 35 common frames omitted
, 문제는 Confluent 스택이 메시지 페이로드의 일부로 메시지 스키마의 ID를 포함하고 클라이언트가 스키마 ID 다음에 실제 Avro 메시지를 읽는 것으로 예상된다는 것입니다. Confluent의 KafkaAvroDeserializer를 사용하기 위해 Kafka 바인딩을 구성해야하지만이 작업을 수행하는 방법을 알 수는 없습니다.
나는 또한 @EnableSchemaRegistry 주석으로 주위를 연주하고 ConfluentSchemaRegistryClient를 구성하려고
(는 브로 인코딩에 문제가 될 것 같지 않습니다 그래서 완벽하게 정상적으로 플루의 아 브로 콘솔 소비자를 사용하여 메시지를 검색 할 수 있습니다) bean과 같이 보이지만 실제로는 역 직렬화가 아닌 스키마를 저장/검색하는 부분 만 제어합니다.
어떻게 든 작동하고 있습니까?
'KafkaMessageChannelBinder'의 소비자 종단점은 항상 키/값 직렬화기를 위해'ByteArrayDeserializer'를 사용합니다. 이는 기본적으로'ByteArraySerializer'를 사용하는'KafkaMessageChannelBinder' 제작자의 결과 일 수 있습니다. 비 Spring Cloud Stream 기반 프로듀서의 경우, 소비자는 위 속성을 사용하여 필요한 디시리얼라이저를 오버라이드 할 수 있어야합니다. –
불행히도, 나는 이미 그것을 시도했습니다. [소스 코드] (https : // github.spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/바인더/kafka/KafkaMessageChannelBinder. java # L254) 디시리얼라이저는 기본 ByteArrayDeserializer로 하드 코딩됩니다. – neptoon
예, 여기에서 문제를 만드십시오 : https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues. 우리는 거기에서 그것을 추적 할 수 있습니다. 감사! –