다중 스레드 수신기를 만들려고하지만 모든 메시지가 동일한 스레드에서 실행됩니다. KafkaListerContainerFactory가 (올바르게) 인스턴스화 한 것이라도, 스레드 ID는 항상 동일하게 실행됩니다. 7 개의 메시지를 거의 동시에 보내면 처음 세 개는 동시에 처리하고 그 다음 세 개는 동시에 처리 한 다음 마지막 세 개는 동시에 처리 할 것으로 예상됩니다. 내가 보는 것은 완료 할 첫 번째 프로세스, 두 번째, 세 번째 등등입니다. 내가 오해하고 있거나 잘못 설정하고 있습니까?ConcurrentMessageListenerContainer가 동시 작동하지 않습니다.
이 내 청취자입니다 :
@Component
public class ExampleKafkaController {
Log log = Log.getLog(ExampleKafkaController.class);
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@KafkaListener(topics = "${kafka.example.topic}")
public void listenForMessage(ConsumerRecord<?, ?> record) {
log.info("Got record:\n" + record.value());
System.out.println("Kafka Thread: " + Thread.currentThread());
System.out.println(kafkaListenerContainerFactory);
log.info("Waiting...");
waitSleep(10000);
log.info("Done!");
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.example.topic}")
public String topic;
public void send(String payload) {
log.info("sending payload='" + payload + "' to topic='" + topic + "'");
kafkaTemplate.send(topic, payload);
}
private void waitSleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
그리고 이것은 새로운 설정 내 응용 프로그램입니다 :
카프카와@SpringBootApplication
@ComponentScan("net.reigrut.internet.services.example.*")
@EntityScan("net.reigrut.internet.services.example.*")
@EnableKafka
@Configuration
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
ConsumerFactory<Integer,String> consumerFactory;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
System.out.println("===========>" + consumerFactory);
System.out.println(factory);
return factory;
}
}
굉장한 정보, 나는 그 문서들 중 하나에서 그것을 수집하지 않았다. 고맙습니다! 그리고 이해 하겠지만 소비자는 둘 이상의 파티션에서 소비 할 수 있지만 모든 소비자는 하나의 파티션 만 사용할 수 있습니다. 옳은? – cmreigrut
예, 맞습니다. 브로커는 동일한 그룹의 모든 소비자에 파티션을 배포합니다. 소비자가 시작하는 동안 여러 재조정을 피하기 위해 속성을 탐색 할 수 있습니다. [귀하의 문제를 해결하는 대답을 수락하는] (https://stackoverflow.com/help/someone-answers) 여기는 관습입니다. 그러면 다른 사람들이 답변을 찾는 데 도움이됩니다. –