카프카 소비자를 구축 중입니다. 나는 회복 콜백을 아래와 같이 설정했다. 수동 커밋을 활성화했습니다. 복구 콜백 메소드의 메시지를 확인하여 지연이 발생하지 않도록하는 방법은 무엇입니까?스프링 카프카 소비자 - 복구 콜백 메커니즘을 사용한 수동 커밋
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
감사합니다. Artem. 마지막 문장을 바탕으로 retrycontext에서 응답 특성을 얻을 수 있다고 가정합니다. 이걸 어떻게 찾을 수 있니? –
'(Acknowledgement) retryContext.getAttribute (RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –
대단히 감사합니다! –