2017-04-13 3 views
0

동일한 그룹 ID를 가진 모든 생산자, 1 개의 주제, 10 개의 파티션, 10 개의 파티션, 10 개의 KafkaConsumers가 하나의 컴퓨터에서 실행됩니다. 파일을 처리 할 때 제작자는 소비자가 행복하게 소비하기 시작하는 3269 개의 ​​메시지를 신속하게 작성합니다. 모든 것이 잠시 동안 잘 돌아가지만 특정 시점에 소비자는 중복을 소비하기 시작합니다. 실제로 메시지 큐를 다시 소비하기 시작한 것처럼 보입니다. 오랜 시간 동안 실행하면 데이터베이스는 동일한 데이터 항목을 6 번 이상 수신하기 시작합니다. 로깅으로 몇 가지 테스트를 한 후에는 소비자가 동일한 고유 한 메시지 이름을 사용하여 동일한 메시지를 다시 소비하는 것처럼 보입니다.Kafka 0.10.2 많은 수의 소비자가 중복 됨

내가 알 수있는 한, 어떤 균형 조정도 일어나지 않습니다. 소비자는 죽어 가거나 추가되지 않습니다. 동일한 10 명의 소비자가 프로세스를 종료 할 때까지 동일한 3269 메시지를 반복해서 사용합니다. 방금 놓아두면 소비자는 수십만 개의 레코드를 작성하여 실제로 데이터베이스에 들어가야하는 데이터 양을 엄청나게 늘릴 것입니다.

저는 카프카에게 상당히 새로운 사람입니다. 그러나 나는 왜 이런 일이 일어나고 있는지를 놓치고 있습니다. 나는 카프카가 정확히 한 번 처리하는 것을 보장하지 않는다는 것을 알고 있으며, 여기저기서 몇 권의 복제본으로 괜찮습니다. 같은 레코드를 다시 유지하지 못하게하는 코드가 있습니다. 그러나 나는 왜 소비자들이 큐를 계속해서 또 다시 소비하는지 확신 할 수 없다. 나는 카프카 메시지가 소비 된 후에는 삭제되지 않는다는 것을 알고 있지만, 모든 소비자가 같은 그룹에 속해 있다면, 오프셋은이를 방지해야합니다. 맞습니까? 나는 오프셋이 어떻게 작동하는지 조금은 이해하지만, 내가 아는 한 리 밸런싱이 없다면 리셋을해서는 안된다. 맞습니까? 그리고 메시지는 제가 말할 수있는 한 시간 초과되지 않습니다. 대기열에서 모든 것을 소비 한 다음 동일한 내용을 영원히 다시 사용하지 않고도 더 많은 메시지를 기다릴 수있는 방법이 있습니까?

는 여기에 내가 생산자와 소비자에 전달하는 proprties 있습니다 : 나에게

Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("group.id", "MyGroup"); 
     props.put("num.partitions", 10); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     MyIngester ingester = new MyIngester(args[0], props); 

답변

1

이 영수증을 인정에 문제가 될 것으로 보인다. 다음 특성을 시도하십시오.

props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "100");