2017-12-19 22 views
0

다중 스레드 수신기를 만들려고하지만 모든 메시지가 동일한 스레드에서 실행됩니다. KafkaListerContainerFactory가 (올바르게) 인스턴스화 한 것이라도, 스레드 ID는 항상 동일하게 실행됩니다. 7 개의 메시지를 거의 동시에 보내면 처음 세 개는 동시에 처리하고 그 다음 세 개는 동시에 처리 한 다음 마지막 세 개는 동시에 처리 할 것으로 예상됩니다. 내가 보는 것은 완료 할 첫 번째 프로세스, 두 번째, 세 번째 등등입니다. 내가 오해하고 있거나 잘못 설정하고 있습니까?ConcurrentMessageListenerContainer가 동시 작동하지 않습니다.

이 내 청취자입니다 :

@Component 
public class ExampleKafkaController { 
    Log log = Log.getLog(ExampleKafkaController.class); 

    @Autowired 
    private KafkaListenerContainerFactory kafkaListenerContainerFactory; 

    @KafkaListener(topics = "${kafka.example.topic}") 
    public void listenForMessage(ConsumerRecord<?, ?> record) { 
     log.info("Got record:\n" + record.value()); 
     System.out.println("Kafka Thread: " + Thread.currentThread()); 
     System.out.println(kafkaListenerContainerFactory); 

     log.info("Waiting..."); 
     waitSleep(10000); 

     log.info("Done!"); 
    } 

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate; 

    @Value("${kafka.example.topic}") 
    public String topic;  

    public void send(String payload) { 
     log.info("sending payload='" + payload + "' to topic='" + topic + "'"); 
     kafkaTemplate.send(topic, payload); 
    } 

    private void waitSleep(long ms) { 
     try { 
      Thread.sleep(ms); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

그리고 이것은 새로운 설정 내 응용 프로그램입니다 :

카프카와
@SpringBootApplication 
@ComponentScan("net.reigrut.internet.services.example.*") 
@EntityScan("net.reigrut.internet.services.example.*") 
@EnableKafka 
@Configuration 
public class Application { 
    public static void main(String[] args) { 
     SpringApplication.run(Application.class, args); 
    } 

    @Autowired 
    ConsumerFactory<Integer,String> consumerFactory; 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory); 
     factory.setConcurrency(3); 
     System.out.println("===========>" + consumerFactory); 
     System.out.println(factory); 
     return factory; 
    } 
} 

답변

1

, 동시성이 주제에 파티션의 수에 제한됩니다. 파티션이 하나만있는 경우 컨테이너의 동시성 설정에 관계없이 하나의 스레드에서만 메시지를받습니다.

원하는 동시성 수보다 커야 할 파티션 수를 프로비저닝해야합니다. 파티션의 수가 동시성보다 큰 경우 파티션은 소비자 스레드에 분산됩니다.

그룹의 한 소비자 만 파티션에서 소비 할 수 있습니다.

+0

굉장한 정보, 나는 그 문서들 중 하나에서 그것을 수집하지 않았다. 고맙습니다! 그리고 이해 하겠지만 소비자는 둘 이상의 파티션에서 소비 할 수 있지만 모든 소비자는 하나의 파티션 만 사용할 수 있습니다. 옳은? – cmreigrut

+0

예, 맞습니다. 브로커는 동일한 그룹의 모든 소비자에 파티션을 배포합니다. 소비자가 시작하는 동안 여러 재조정을 피하기 위해 속성을 탐색 할 수 있습니다. [귀하의 문제를 해결하는 대답을 수락하는] (https://stackoverflow.com/help/someone-answers) 여기는 관습입니다. 그러면 다른 사람들이 답변을 찾는 데 도움이됩니다. –