1

카프카 소비자를 구축 중입니다. 나는 회복 콜백을 아래와 같이 설정했다. 수동 커밋을 활성화했습니다. 복구 콜백 메소드의 메시지를 확인하여 지연이 발생하지 않도록하는 방법은 무엇입니까?스프링 카프카 소비자 - 복구 콜백 메커니즘을 사용한 수동 커밋

@Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConcurrency(conncurrency); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setRetryTemplate(retryTemplate()); 
       factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      return null; 
     } 
    }); 
     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

답변

2

귀하의 질문이 너무 광범위합니다. 좀 더 구체적이어야합니다.

소비 오류 동안 재 시도가 어려워 질 때 할 수있는 일은 프레임 워크에 어떤 가정도 없습니다.

는 당신이 전혀 그 RecoveryCallback 무엇인지 이해하기 위해 Spring Retry 프로젝트에서 시작해야한다고 생각하고 그것을 작동하는 방법 : 템플릿을 중단하기로 결정하기 전에 비즈니스 로직이 성공하지 못한 경우

는, 다음 클라이언트가

복구 콜백을 통해 대체 처리를 수행 할 수있는 기회가 주어졌습니다.

RetryContext이 있습니다

/** 
* Accessor for the exception object that caused the current retry. 
* 
* @return the last exception that caused a retry, or possibly null. It will be null 
* if this is the first attempt, but also if the enclosing policy decides not to 
* provide it (e.g. because of concerns about memory usage). 
*/ 
Throwable getLastThrowable(); 

또한 봄 카프카가 다루는 그 RetryContext에 추가 속성을 채우는 RecoveryCallback에서 다음 RecoveryCallback에 전달 된 RetryContexthttps://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

내용 것 청취자의 유형에 의존한다. 컨텍스트에는 항상 오류가 발생한 레코드 인 record 속성이 있습니다. 청취자가 인식하고 있거나 소비자가 알고있는 경우 추가 속성 acknowledgment 및/또는 consumer을 사용할 수 있습니다. 편의상 RetryingAcknowledgingMessageListenerAdapter은 이러한 키에 정적 상수를 제공합니다. 자세한 정보는 javadocs를 참조하십시오.

+0

감사합니다. Artem. 마지막 문장을 바탕으로 retrycontext에서 응답 특성을 얻을 수 있다고 가정합니다. 이걸 어떻게 찾을 수 있니? –

+0

'(Acknowledgement) retryContext.getAttribute (RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –

+0

대단히 감사합니다! –