DB로 처리 한 후 각 레코드를 인정하는 Spring KafkaListener를 사용하고 있습니다. DB에 쓰는 데 문제가 있다면 우리는 오프셋을 소비자에게 저지하지 않도록 기록을 인정하지 않습니다. 이 잘 작동합니다. 이제 우리는 다음 폴링에서 실패한 메일을 다시 보내려합니다. 우리는 청취자에게 errorhandler를 추가하고 ConsumerAwareListenerErrorHandler를 호출하고 실패한 메시지 오프셋에 대해 consumer.seek()를 시도했습니다. 기대는 다음 투표 중입니다. 실패한 메시지를 받아야합니다. 이것은 일어나지 않습니다. 다음 폴링은 새 메시지 만 가져오고 실패한 메시지는 가져 오지 않습니다. 코드 조각은 아래에 나와 있습니다.봄 카프카 : 카프카 리스너 - 소비자. 독창적 인 문제
@Service
public class KafkaConsumer {
@KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler")
public void onMessage(ConsumerRecord<Integer, String> record,
Acknowledgment acknowledgment) throws Exception {
try {
msg = JaxbUtil.convertJsonStringToMsg(record.value());
onHandList = DCMUtil.convertMsgToOnHandDTO(msg);
TeradataDAO.updateData(onHandList);
acknowledgment.acknowledge();
recordSuccess = true;
LOGGER.info("Message Saved in Teradata DB");
} catch (Exception e) {
LOGGER.error("Error Processing On Hand Data ", e);
recordSuccess = false;
}
}
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException {
return (message, exception, consumer) -> {
this.listen3Exception = exception;
MessageHeaders headers = message.getHeaders();
consumer.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
}
Container Class
@Bean
public Map<Object,Object> consumerConfigs() {
Map<Object,Object> props = new HashMap<Object,Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
localhost:9092);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@SuppressWarnings("unchecked")
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
난 내 리스너에 추가 된 소비자를 시도하고 예외 @KafkaListener에 (주제 = ("$ {kafka.input.stream.topic}"), containerFactory는 = "kafkaManualAckListenerContainerFactory") \t \t 공공 무효의 onMessage를 추구 않았다 (ConsumerRecord <정수, 문자열> 기록은 \t \t \t \t 승인 승인은 소비자 소비자)) { \t \t \t consumer.seek (새 org.apache.kafka.common.TopicPartition (기록을 예외 { } 캐치 (예외 전자를 발생 .topic(), record.partition()), record.offset()); \t \t \t throw e; 오류 핸들러가 null 대신 LoggingErrorHandler()로 표시됩니다. – SKumar
주석에 코드를 넣지 마십시오. 그것은 읽기가 어렵습니다. 대신 질문을 편집하십시오. 네; 나는 https://github.com/spring-projects/spring-kafka/issues/470에서 일하기 시작했다. 다음날에 수정이 있기를 바랄 것이다. –
나는 지금 당장은 ConsumerAwareErrorHandler (리스너 오류 처리기가 아닌 컨테이너에서)를 사용하고'Error'를 던지면된다고 생각한다. –