2016-10-09 7 views
3

나는 다음과 같은 시나리오 한 :봄 클라우드 스트림 카프카> 플루 REST 프록시에서 아 브로 메시지를 소모

내 응용 프로그램은 다음과 같습니다

@SpringBootApplication 
@EnableBinding(Sink.class) 
public class MyApplication { 
    private static Logger log = LoggerFactory.getLogger(MyApplication.class); 

    public static void main(String[] args) { 
    SpringApplication.run(MyApplication.class, args); 
    } 

    @StreamListener(Sink.INPUT) 
    public void myMessageSink(MyMessage message) { 
    log.info("Received new message: {}", message); 
    } 
} 

반면 MyMessage는 Avro 스키마에서 Avro에 의해 작성된 클래스입니다.

내 application.properties은 다음과 같습니다

spring.cloud.stream.bindings.input.destination=myTopic 
spring.cloud.stream.bindings.input.group=${spring.application.name} 
spring.cloud.stream.bindings.input.contentType=application/*+avro 

내 문제는 새로운 메시지를 수신 할 때마다 다음과 같은 예외가 발생하는 것이 지금 : 내가 이해에서

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27 
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    ... 
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27 
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1] 
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    ... 35 common frames omitted 

, 문제는 Confluent 스택이 메시지 페이로드의 일부로 메시지 스키마의 ID를 포함하고 클라이언트가 스키마 ID 다음에 실제 Avro 메시지를 읽는 것으로 예상된다는 것입니다. Confluent의 KafkaAvroDeserializer를 사용하기 위해 Kafka 바인딩을 구성해야하지만이 작업을 수행하는 방법을 알 수는 없습니다.

나는 또한 @EnableSchemaRegistry 주석으로 주위를 연주하고 ConfluentSchemaRegistryClient를 구성하려고

(는 브로 인코딩에 문제가 될 것 같지 않습니다 그래서 완벽하게 정상적으로 플루의 아 브로 콘솔 소비자를 사용하여 메시지를 검색 할 수 있습니다) bean과 같이 보이지만 실제로는 역 직렬화가 아닌 스키마를 저장/검색하는 부분 만 제어합니다.

어떻게 든 작동하고 있습니까?

답변

0

per-binding 속성 spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializerConfluent's KafkaAvroDeserializer class name으로 설정할 때 작동합니까?

+1

'KafkaMessageChannelBinder'의 소비자 종단점은 항상 키/값 직렬화기를 위해'ByteArrayDeserializer'를 사용합니다. 이는 기본적으로'ByteArraySerializer'를 사용하는'KafkaMessageChannelBinder' 제작자의 결과 일 수 있습니다. 비 Spring Cloud Stream 기반 프로듀서의 경우, 소비자는 위 속성을 사용하여 필요한 디시리얼라이저를 오버라이드 할 수 있어야합니다. –

+0

불행히도, 나는 이미 그것을 시도했습니다. [소스 코드] (https : // github.spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/바인더/kafka/KafkaMessageChannelBinder. java # L254) 디시리얼라이저는 기본 ByteArrayDeserializer로 하드 코딩됩니다. – neptoon

+1

예, 여기에서 문제를 만드십시오 : https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues. 우리는 거기에서 그것을 추적 할 수 있습니다. 감사! –

0

일종의 대답은 내 자신의 질문입니다. 지금 내가 한 것은 Avro 디코더에 전달하기 전에 모든 메시지의 처음 4 바이트를 제거하는 MessageConverter를 구현하는 것입니다. 코드는 주로 봄 클라우드 스트림의 AbstractAvroMessageConverter에서 가져온 것입니다 :

public class ConfluentAvroSchemaMessageConverter extends AvroSchemaMessageConverter { 

public ConfluentAvroSchemaMessageConverter() { 
    super(new MimeType("application", "avro+confluent")); 
} 

@Override 
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { 
    Object result = null; 
    try { 
     byte[] payload = (byte[]) message.getPayload(); 

     // byte array to contain the message without the confluent header (first 4 bytes) 
     byte[] payloadWithoutConfluentHeader = new byte[payload.length - 4]; 
     ByteBuffer buf = ByteBuffer.wrap(payload); 
     MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders()); 
     if (mimeType == null) { 
      if (conversionHint instanceof MimeType) { 
       mimeType = (MimeType) conversionHint; 
      } 
      else { 
       return null; 
      } 
     } 

     // read first 4 bytes and copy the rest to the new byte array 
     // see https://groups.google.com/forum/#!topic/confluent-platform/rI1WNPp8DJU 
     buf.getInt(); 
     buf.get(payloadWithoutConfluentHeader); 

     Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType); 
     Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass); 
     DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema); 
     Decoder decoder = DecoderFactory.get().binaryDecoder(payloadWithoutConfluentHeader, null); 
     result = reader.read(null, decoder); 
    } 
    catch (IOException e) { 
      throw new MessageConversionException(message, "Failed to read payload", e); 
    } 
    return result; 

} 

나는 다음 application.properties를 통해 응용 프로그램/브로 + 합류에 들어오는 카프카 주제의 콘텐츠 유형을 설정합니다.

적어도 Confluent 스택으로 인코딩 된 메시지는 검색 할 수 있지만 물론 스키마 레지스트리와 상호 작용하지는 않습니다.