0

데모 어플리케이션의 경우, kafka 큐에서 메시지를 반환하기 위해 나머지 컨트롤러를 만들어야합니다. 나는 스프링 카프카의 참조 설명서를 읽고 소비자 구성을 구현 만든 콩을나머지 컨트롤러가 spring kafka를 통해 kafka의 레코드를 반환합니다.

@Configuration 
@EnableKafka 
public class ConsumerConfiguration { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     // list of host:port pairs used for establishing the initial connections to the Kakfa cluster 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     // allows a pool of processes to divide the work of consuming and processing records 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(
       consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Transaction.class) 
     ); 
    } 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Transaction>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Transaction> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(transactionConsumerFactory()); 

     return factory; 
    } 

    @Bean 
    public Consumer consumer() { 
     return new Consumer(); 
    } 

} 

아래로 아래

public class Consumer { 

    private CountDownLatch latch = new CountDownLatch(1); 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

    @KafkaListener(topics = "${kafka.topic.name}") 
    public void receive(Transaction transaction) { 
     latch.countDown(); 
    } 
} 

같은 다른 클래스 소비자는 어떻게 지금부터 거래를 검색 할 수있는 논리를 구현할 수있다 각각의 소비자는 컨트롤러에서 히트를 친다.

미리 감사드립니다.

답변

2

음, @KafkaListener은 카프카에서 콜백으로 레코드를 스트리밍하기 위해 독립적 인 수명이 긴 프로세스를 생성합니다. REST GET 이벤트에 대해 이야기하고 있으므로 ConsumerFactory에서 KafkaConsumer을 가져오고 해당 컨트롤러 메서드에서 poll()을 수동으로 호출하지 않으면 선택의 여지가 없습니다.

+0

답장을 보내 주신 Artem에게 감사드립니다. 가이드 또는 동일한 문서를 알려 주시기 바랍니다. – maverick

+0

어쩌면이 : http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html? –

+0

고마워, 나는 코드를보고 있었고 스프링 - 카프카를 사용하지 않고 카프카 API를 직접 사용하지 않는 것으로 보인다. 내가 맞습니까? – maverick