다음 두 단계를 사용하여 CentOS7 인스턴스의 Confluent-3.3.0 플랫폼에서 실행중인 kafka-rest 서비스를 통해 'InventoryEvent'사용자 정의 Java 유형의 이벤트를 게시합니다.Spring-Kafka Consumer KafkaListener가 GenericMessage를 Java 객체로 변환 할 수 없습니다.
카프카 받침대curl -X POST -H "Content-Type:application/vnd.kafka.json.v2+json" --data '{"records" : [{"value" : {"id":1231, "eventType": "inventory.transaction", "qtyLevel" : 2223, "qtyReq" : 2345}}]}' "http://localhost:8082/topics/inventory"
이 주제
에 소비자 인스턴스를 구독에 POST JSON 이벤트에 는명령
curl -X POST -H "Content-Type:application/vnd.kafka.v2+json" --data '{"topics" : ["inventory"]}' http://localhost:8082/consumers/inventory_consumers/instances/consumer_1/subscription
다음 나는 JSON을 소비하고 아래와 같이 @KafkaListener 주석 소비자 리스너 방법을 통해 자바 형식으로 다시 변환해야 봄 - 카프카 응용 프로그램을 통해, 카프카 브로커에 전송 된 이벤트를 소비하고 :
public class InventoryEventReceiver { private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class); private CountDownLatch latch = new CountDownLatch(1); public CountDownLatch getLatch() { return latch; } @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listenWithHeaders( @Payload InventoryEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) String offset ) { System.out.println("EVENT HAS BEEN RECEIVED "); System.out.println(event.toString()); ObjectMapper objectMapper = new ObjectMapper(); String invEventInString = null; try { invEventInString = objectMapper.writeValueAsString(event); System.out.println(invEventInString); } catch (IOException e) { e.printStackTrace(); } latch.countDown(); } }
그러나 위의 수신기 코드를 통해 내가 시도했지만 같은 오류가받은
다른 리스너 메소드 정의를 메시지를 사용하는 동안 나는 KafkaListenerContainer에 아래의 오류 로그를 얻고있다 :
는 InventoryEvent 객체 방청
@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listen(ConsumerRecord<?,?> record)
내 수신기 구성은 다음 값 InventoryEvent 소요 (에러 로그에서 큐 복용) ConsumerRecord 듣고
@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
public void listenWithHeaders(
InventoryEvent event)
자리 표시 자. 나는 그것을 String으로 변경하고 StringJsonMessageConverter를 통해
containerFactory.setMessageConverter(new StringJsonMessageConverter());
을 추가했지만 동일한 오류가 발생했습니다.
MessageConverter 또는 MessageListener와 같은 기본 Spring-Kafka 구성이 누락되었거나 사용자 정의 MessageConverter를 구현하여 JSON을 Java 유형 InventoryEvent로 역 직렬화해야만합니까?
@EnableKafka @Configuration public class InventoryReceiverConfig { @Bean public static ConsumerFactory<String, InventoryEvent> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(InventoryEvent.class)); } @Bean public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(consumerFactory()); containerFactory.setConcurrency(3); containerFactory.getContainerProperties().setPollTimeout(3000); return containerFactory; } @Bean public static Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); return consumerProps; } @Bean public InventoryEventReceiver receiver() { return new InventoryEventReceiver(); } }
오류 로그 :
2017-12-19 13:49:08.671 ERROR 16965 --- [fka-listener-23] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 48, CreateTime = 1513691348668, checksum = 537414172, serialized key size = -1, serialized value size = 77, key = null, value = {id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)] Bean [[email protected]]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:156) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] ... 10 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:152) ~[spring-kafka-1.1.1.RELEASE.jar:na] ... 9 common frames omitted 2017-12-19 13:49:28.869 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started 2017-12-19 13:49:28.889 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms
내가 리스너 메소드 서명을 보여 주었다 ... 참으로 사용해야합니다.InventoryEventReceiver 클래스에서, 인수 (String, String, Integer, int, String)를 갖는 KafkaListener로 listenWithHeaders 주석을 달았습니다. - InventoryEvent 이벤트, (KafkaHeaders.RECEIVED_TOPIC) 문자열 항목, (KafkaHeaders.RECEIVED_MESSAGE_KEY) 정수 키, KafkaHeaders.RECEIVED_PARTITION_ID) int partition, (KafkaHeaders.OFFSET) 문자열 오프셋입니다. – somnathchakrabarti
또한 동일한 오류로 시도한 두 개의 리스너 메소드 서명을 제공했습니다. StringJsonMessageConverter를 사용하지 않습니다. 메시지의 값 부분에 JsonSerialize/JsonDeserialize를 수행하고 있습니다. – somnathchakrabarti
적절한 방법으로 JsonDeserializer에 대한 사용자 정의 MessageConverter를 의미합니까? 그렇다면 간단한 예를 들어 줄 수 있습니까? – somnathchakrabarti