0

리스너 메소드의 내 고객 클래스 InventoryEventReceiver에서 아래 오류가 발생합니다. NullPointerException이 표시되는 이유를 모르십니까? 나는 단지 두 개의 InventoryEvent 개체를 게시하고 있습니다.org.springframework.kafka.listener.ListenerExecutionFailedException : 리스너 메소드가 java.lang.NullPointerException을 던졌습니다.

빠른 도움말을 보내 주시면 감사하겠습니다. 리스너 방법

public class InventoryEventReceiver { 

    private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class); 

    private CountDownLatch latch = new CountDownLatch(1); 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") 
    public void listenWithHeaders(
      InventoryEvent event) { 

     System.out.println("EVENT HAS BEEN RECEIVED by listenWithHeaders(InventoryEvent)"); 
     System.out.println(event.toString()); 


     log.info(System.currentTimeMillis() + "-- Received Event :\"" + event + " for topic : inventory"); 
     System.out.println("Sending event to webhook triggers ... "); 
     KafkaWebhookServiceImpl webhookService = new KafkaWebhookServiceImpl(); 

     List<WebhookRequestBody> listWebhooks = webhookService.getAllWebhooksForTopic("inventory"); 
     System.out.println("Number of registered webhooks for topic \"inventory\" : " + listWebhooks.size()); 

     CountDownLatch countLatch = new CountDownLatch(listWebhooks.size()); 

     for(WebhookRequestBody w : listWebhooks) { 
      Executors.newSingleThreadExecutor().execute(new InventoryEventProcessor(countLatch, event, w)); 
     } 

     try { 
      countLatch.await(); // wait until countLatch counted down to 0 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Events SENT to all listening webhook triggers. "); 

     latch.countDown(); 
    } 
} 

KafkaWebhookServiceImpl 클래스

@Service("webhookService") 
@Transactional 
public class KafkaWebhookServiceImpl implements KafkaWebhookService { 

    @Autowired 
    private KafkaWebhookRepository webhookRepository; 

    @Override 
    public List<WebhookRequestBody> getAllWebhooksForTopic(String topic) { 
     return webhookRepository.findByTopic(topic); <-- ERROR: line 45   
    } 
} 

내가

,536 카프카 REST 프록시를 통해 두 개의 레코드 아래를 게시하고와

내 소비자 클래스

curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"InventoryEvent\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"eventType\", \"type\": \"string\"},{\"name\": \"qtyReq\", \"type\": \"int\"},{\"name\": \"qtyLevel\", \"type\": \"int\"}]}", "records": [{"value": {"id": 6122,"eventType":"inventory.transaction","qtyReq": 34,"qtyLevel": 129}},{"value": {"id": 7798,"eventType":"inventory.transaction","qtyReq": 5,"qtyLevel": 27}}]}' http://localhost:8082/topics/inventory 

오류 로그

EVENT HAS BEEN RECEIVED by listenWithHeaders(InventoryEvent) 
InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'] 
2017-12-29 10:51:22.375 INFO 12418 --- [ntainer#0-0-C-1] c.p.kafka.spring.InventoryEventReceiver : 1514544682375-- Received Event :"InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'] for topic : inventory 
Sending event to webhook triggers ... 
2017-12-29 10:51:22.376 ERROR 12418 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 23, CreateTime = 1514544682080, checksum = 1801448922, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27']) 

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent)' threw exception; nested exception is java.lang.NullPointerException 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] 
     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 
Caused by: java.lang.NullPointerException: null 
     at com.psl.kafka.rest.KafkaWebhookServiceImpl.getAllWebhooksForTopic(KafkaWebhookServiceImpl.java:45) ~[classes/:0.0.1-SNAPSHOT] 
     at com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(InventoryEventReceiver.java:126) ~[classes/:0.0.1-SNAPSHOT] 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151] 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151] 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151] 
     at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     ... 8 common frames omitted 
+0

어떤 줄이'KafkaWebhookServiceImpl.java : 45'에 해당합니까? – Natalia

+0

은 메인 포스트에'KafkaWebhookServiceImpl.java'을 추가했습니다. – somnathchakrabarti

답변

3

문제는 InventoryEventReceiver 클래스 KafkaWebhookServiceImpl webhookService = new KafkaWebhookServiceImpl();입니다.

스프링이 종속성을 관리하도록하려면 (autowired 프로세스) 직접 bean을 생성하면 안됩니다. webhookRepository가 null로 결코 설정되지로

지금이 코드

@Override 
public List<WebhookRequestBody> getAllWebhooksForTopic(String topic) { 
    return webhookRepository.findByTopic(topic); <-- ERROR: line 45   
} 

에 당신은 NPE를 얻었다.

InventoryEventReceiver 클래스를 다시 작성하여 인스턴스가 webhookRepository이고 인스턴스를 만들지 않아야합니다.