1
봄 카프카 소비자를 구축 중입니다. 나는 재시도 메커니즘을 설정했다. 재시도가 끝나면 실패한 메시지를 불필요한 편지 주제로 푸시하고 싶습니다.Kafka 소비자 - 복구 방법에서 청취자가 수신 한 매개 변수를 가져옵니다.
는이 방법은 아래의 매개 변수
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack) throws JsonProcessingException {
를 Recover 방법의 일환으로, 나는 conciseMap 리스너 맵 또는 내 주제에 의해 수신 된 원본 메시지에 입력으로 전달 인출 할
을 가지고들을 수 있습니다. 그것을 할 수있는 방법이 있습니까?@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
감사 아르 템을. ConsumerRecord.value는 리스너 메소드에서 가져온 바이트를 제공합니까? –
M-m-m. 아마도. 제공된 'Deserializer'의 결과입니다. –