2017-12-19 67 views
1

다음 두 단계를 사용하여 CentOS7 인스턴스의 Confluent-3.3.0 플랫폼에서 실행중인 kafka-rest 서비스를 통해 'InventoryEvent'사용자 정의 Java 유형의 이벤트를 게시합니다.Spring-Kafka Consumer KafkaListener가 GenericMessage를 Java 객체로 변환 할 수 없습니다.

카프카 받침대

curl -X POST -H "Content-Type:application/vnd.kafka.json.v2+json" --data '{"records" : [{"value" : {"id":1231, "eventType": "inventory.transaction", "qtyLevel" : 2223, "qtyReq" : 2345}}]}' "http://localhost:8082/topics/inventory" 

이 주제

에 소비자 인스턴스를 구독에 POST JSON 이벤트에 는

명령

curl -X POST -H "Content-Type:application/vnd.kafka.v2+json" --data '{"topics" : ["inventory"]}' http://localhost:8082/consumers/inventory_consumers/instances/consumer_1/subscription 

다음 나는 JSON을 소비하고 아래와 같이 @KafkaListener 주석 소비자 리스너 방법을 통해 자바 형식으로 다시 변환해야 봄 - 카프카 응용 프로그램을 통해, 카프카 브로커에 전송 된 이벤트를 소비하고 :

public class InventoryEventReceiver { 

    private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class); 

    private CountDownLatch latch = new CountDownLatch(1); 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") 
    public void listenWithHeaders(
      @Payload InventoryEvent event, 
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 
      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
      @Header(KafkaHeaders.OFFSET) String offset 
      ) { 

     System.out.println("EVENT HAS BEEN RECEIVED "); 
     System.out.println(event.toString()); 


     ObjectMapper objectMapper = new ObjectMapper(); 
     String invEventInString = null; 
     try { 
      invEventInString = objectMapper.writeValueAsString(event); 
      System.out.println(invEventInString); 

     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

     latch.countDown(); 
    } 
} 

그러나 위의 수신기 코드를 통해 내가 시도했지만 같은 오류가받은

다른 리스너 메소드 정의를 메시지를 사용하는 동안 나는 KafkaListenerContainer에 아래의 오류 로그를 얻고있다 :

는 InventoryEvent 객체 방청

@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") 
    public void listen(ConsumerRecord<?,?> record) 

내 수신기 구성은 다음 값 InventoryEvent 소요 (에러 로그에서 큐 복용) ConsumerRecord 듣고

@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") 
    public void listenWithHeaders(
      InventoryEvent event) 

자리 표시 자. 나는 그것을 String으로 변경하고 StringJsonMessageConverter를 통해

containerFactory.setMessageConverter(new StringJsonMessageConverter()); 

을 추가했지만 동일한 오류가 발생했습니다.

MessageConverter 또는 MessageListener와 같은 기본 Spring-Kafka 구성이 누락되었거나 사용자 정의 MessageConverter를 구현하여 JSON을 Java 유형 InventoryEvent로 역 직렬화해야만합니까?

@EnableKafka 
@Configuration 
public class InventoryReceiverConfig { 

    @Bean 
    public static ConsumerFactory<String, InventoryEvent> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
       new JsonDeserializer<>(InventoryEvent.class)); 
    } 

    @Bean 
    public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); 
     containerFactory.setConsumerFactory(consumerFactory()); 
     containerFactory.setConcurrency(3); 
     containerFactory.getContainerProperties().setPollTimeout(3000); 
     return containerFactory; 
    } 

    @Bean 
    public static Map<String, Object> consumerConfigs() { 
     Map<String, Object> consumerProps = new HashMap<>(); 
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers"); 
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); 
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); 
     return consumerProps; 
    } 

    @Bean 
    public InventoryEventReceiver receiver() { 
     return new InventoryEventReceiver(); 
    } 

} 

오류 로그 :

2017-12-19 13:49:08.671 ERROR 16965 --- [fka-listener-23] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 48, CreateTime = 1513691348668, checksum = 537414172, serialized key size = -1, serialized value size = 77, key = null, value = {id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}) 

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 
Endpoint handler details: 
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)] 
Bean [[email protected]]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:156) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] 
     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] 
     ... 10 common frames omitted 
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] 
     at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:152) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
     ... 9 common frames omitted 

2017-12-19 13:49:28.869 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet  : FrameworkServlet 'dispatcherServlet': initialization started 
2017-12-19 13:49:28.889 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet  : FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms 

답변

1

이 스택 추적을 참조하십시오 :

Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)] 

방법 서명은 listenWithHeaders(String, String, Integer, int, String)

비슷하지만 당신은 우리에게 절대적으로 다른 하나를 보여줍니다. 실제로 런타임에 적절한 코드를 사용하는지 확인하십시오. 당신이 JsonDeserializer이있는 경우

, 당신은 정말 StringJsonMessageConverter 필요하지 않지만 적절한 방법은 스택 추적과 같이

+0

내가 리스너 메소드 서명을 보여 주었다 ... 참으로 사용해야합니다.InventoryEventReceiver 클래스에서, 인수 (String, String, Integer, int, String)를 갖는 KafkaListener로 listenWithHeaders 주석을 달았습니다. - InventoryEvent 이벤트, (KafkaHeaders.RECEIVED_TOPIC) 문자열 항목, (KafkaHeaders.RECEIVED_MESSAGE_KEY) 정수 키, KafkaHeaders.RECEIVED_PARTITION_ID) int partition, (KafkaHeaders.OFFSET) 문자열 오프셋입니다. – somnathchakrabarti

+0

또한 동일한 오류로 시도한 두 개의 리스너 메소드 서명을 제공했습니다. StringJsonMessageConverter를 사용하지 않습니다. 메시지의 값 부분에 JsonSerialize/JsonDeserialize를 수행하고 있습니다. – somnathchakrabarti

+0

적절한 방법으로 JsonDeserializer에 대한 사용자 정의 MessageConverter를 의미합니까? 그렇다면 간단한 예를 들어 줄 수 있습니까? – somnathchakrabarti