2017-11-02 9 views
0

DB로 처리 한 후 각 레코드를 인정하는 Spring KafkaListener를 사용하고 있습니다. DB에 쓰는 데 문제가 있다면 우리는 오프셋을 소비자에게 저지하지 않도록 기록을 인정하지 않습니다. 이 잘 작동합니다. 이제 우리는 다음 폴링에서 실패한 메일을 다시 보내려합니다. 우리는 청취자에게 errorhandler를 추가하고 ConsumerAwareListenerErrorHandler를 호출하고 실패한 메시지 오프셋에 대해 consumer.seek()를 시도했습니다. 기대는 다음 투표 중입니다. 실패한 메시지를 받아야합니다. 이것은 일어나지 않습니다. 다음 폴링은 새 메시지 만 가져오고 실패한 메시지는 가져 오지 않습니다. 코드 조각은 아래에 나와 있습니다.봄 카프카 : 카프카 리스너 - 소비자. 독창적 인 문제

@Service 
public class KafkaConsumer { 
     @KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler") 
     public void onMessage(ConsumerRecord<Integer, String> record, 
      Acknowledgment acknowledgment) throws Exception { 

    try { 

     msg = JaxbUtil.convertJsonStringToMsg(record.value()); 
     onHandList = DCMUtil.convertMsgToOnHandDTO(msg); 

     TeradataDAO.updateData(onHandList); 
     acknowledgment.acknowledge(); 
     recordSuccess = true; 

     LOGGER.info("Message Saved in Teradata DB"); 

    } catch (Exception e) { 
     LOGGER.error("Error Processing On Hand Data ", e); 
     recordSuccess = false; 
    } 

} 

    @Bean 
    public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException { 
     return (message, exception, consumer) -> { 
      this.listen3Exception = exception; 
      MessageHeaders headers = message.getHeaders(); 
      consumer.seek(new org.apache.kafka.common.TopicPartition(
          headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), 
          headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), 
          headers.get(KafkaHeaders.OFFSET, Long.class)); 
      return null; 
     }; 
    } 
} 

    Container Class 

    @Bean 
public Map<Object,Object> consumerConfigs() { 
    Map<Object,Object> props = new HashMap<Object,Object>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      localhost:9092); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    return props; 
} 

@SuppressWarnings({ "rawtypes", "unchecked" }) 
@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory(consumerConfigs()); 
} 

@SuppressWarnings("unchecked") 
@Bean 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
kafkaManualAckListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

답변

0

이 같은 일을 해야하는 :

오류 처리기가 이전 여론 조사에서 추가 기록을 폐기하려는 경우 예외를 throw 할 필요가있다.

오류를 처리 중이므로 컨테이너는 아무것도 모르고 계속해서 폴링의 나머지 레코드로 청취자를 호출합니다.

그렇습니다. 오류 처리기에서 throw 된 예외도 컨테이너가 무시한다는 것을 알았습니다. 오류 처리기가 Error 예외를 발생 시키면 버립니다. 나는이 문제를 열어 놓을 것이다.

또 다른 해결 방법은 수신기 메서드 서명에 Consumer을 추가하고 거기에서 찾기를 수행하여 예외를 throw하는 것입니다. 오류 처리기가 없으면 일} 처리의 나머지는 버려집니다.

보정

용기가 더 ErrorHandler이 없으면

나머지 레코드를 일으킬 것이다 ListenerErrorHandler 의해 슬로우 Throwable 폐기한다.

+0

난 내 리스너에 추가 된 소비자를 시도하고 예외 @KafkaListener에 (주제 = ("$ {kafka.input.stream.topic}"), containerFactory는 = "kafkaManualAckListenerContainerFactory") \t \t 공공 무효의 onMessage를 추구 않았다 (ConsumerRecord <정수, 문자열> 기록은 \t \t \t \t 승인 승인은 소비자 소비자)) { \t \t \t consumer.seek (새 org.apache.kafka.common.TopicPartition (기록을 예외 { } 캐치 (예외 전자를 발생 .topic(), record.partition()), record.offset()); \t \t \t throw e; 오류 핸들러가 null 대신 LoggingErrorHandler()로 표시됩니다. – SKumar

+0

주석에 코드를 넣지 마십시오. 그것은 읽기가 어렵습니다. 대신 질문을 편집하십시오. 네; 나는 https://github.com/spring-projects/spring-kafka/issues/470에서 일하기 시작했다. 다음날에 수정이 있기를 바랄 것이다. –

+0

나는 지금 당장은 ConsumerAwareErrorHandler (리스너 오류 처리기가 아닌 컨테이너에서)를 사용하고'Error'를 던지면된다고 생각한다. –