1

봄 카프카 소비자를 구축 중입니다. 나는 재시도 메커니즘을 설정했다. 재시도가 끝나면 실패한 메시지를 불필요한 편지 주제로 푸시하고 싶습니다.Kafka 소비자 - 복구 방법에서 청취자가 수신 한 매개 변수를 가져옵니다.

는이 방법은 아래의 매개 변수

public void listen(@Payload Map<String, Object> conciseMap, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     Acknowledgment ack) throws JsonProcessingException { 

를 Recover 방법의 일환으로, 나는 conciseMap 리스너 맵 또는 내 주제에 의해 수신 된 원본 메시지에 입력으로 전달 인출 할

을 가지고들을 수 있습니다. 그것을 할 수있는 방법이 있습니까?

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(conncurrency); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRetryTemplate(retryTemplate()); 
    factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge(); 

      return null; 
     } 
    }); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

답변

1

당신은이 RecoveryCallbackRetryContextconciseMap 변환 얻을 수는 없지만 변환하기 전에 주제에서 원래이다 ConsumerRecord를 검색 할 수 있습니다

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD) 
+0

감사 아르 템을. ConsumerRecord.value는 리스너 메소드에서 가져온 바이트를 제공합니까? –

+0

M-m-m. 아마도. 제공된 'Deserializer'의 결과입니다. –