2016-12-29 2 views
0

사람이 아래 queries.I에서 저를 도와 줄 수는 카프카 - 클라이언트 - 0.10.1.1 (단일 노드 단일 브로커) auto.create.topics.enable의카프카 클라이언트 API 질문

기본값을 사용하고 있습니다이 참입니다.

1.I는 소비를 들어

kafkaProdcuer<String,String> producer> producer... 
    producer.send(new ProducerRecord<String, String>("my- topic","message")); 
    producer.close(); 

사용하여 주제에 메시지를 보내고 :

kafkaConsumer<String,String> consumer.... 
    consumer.subscribe(Arrays.asList("my-topic")); 
    ConsumerRecords<String, String> records = consumer.poll(200); 

    while(true){ 
    for (ConsumerRecord<String, String> record : records) { 
      System.out.println(record.value()); 
     } 
    } 

문제는 내가 처음으로 소비자를 실행할 때, 그것은 값을하지 않는 것입니다. 그리고 저는 생산자를 운영하고 소비자를 다시 불러 와서 가치를 얻습니다. 프로듀서를 3 번 ​​실행해야하는 경우도 있습니다. 왜 이렇게 작동합니까? enable.auto.commit 속성이 false 인 경우

이 같은 소비자가 메시지를 여러 번 읽을 수 거짓

2) enable.auto.commit =? 1 point.How 내 소비자 코드는 내가 말 루프를 깰 수 고려

3.) 어떻게 소비자는 항상 사용)은 모든 메시지를 읽고 다음 consumer.close()

+0

kafka bin에 console-consumer가 있습니다. 소비자가 데이터를 소비 할 수없는 동안 시도해 볼 수 있습니다. 가능하면 producer.flush()를 추가하십시오. 3 번 질문에 대해서는 스트리밍 프로그램이 배치의 끝을 알 수있는 방법이 없지만 시간 초과 스레드를 설정하여 데이터가 소비되지 않는 시간 초과를 모니터링 할 수 있습니다. – Lhfcws

+0

예 bin 소비자로 테스트 한 결과 상관 ID 1을 사용하여 메타 데이터를 가져 오는 동안 오류가 발생했습니다 : {my-topic-106 = LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) – jena84

+0

데이터를 생성 했습니까? 최근에 데이터를 소비하기 전에?기본적으로 Kafka는 데이터를 3 일 동안 만 보관합니다. – Lhfcws

답변

1

1을 호출하고있다 알 수 같은 group.id가 소비자입니까? 당신은 소비하기 전에 생산하고 있습니까? 이는 소비자 그룹 및 상쇄 관리와 관련 될 수 있습니다. this answer about consumer offset behavior을 참조하십시오.

2) 의도적으로 또는 실수로 중복 된 것을 읽는 것은 확실하지 않습니다. 주제 보존 정책으로 인해 메시지가 삭제되지 않은 한 해당 위치로 이동하기 위해 항상 동일한 메시지를 다시 읽을 수 있습니다. 우연히 의미하는 경우 자동 커밋을 false로 설정하면 소비자가 커밋하지 않기 때문에 commitSync() 또는 commitAsync()를 수동으로 호출해야합니다. 어떤 경우에도 소비자가 메시지를 처리하고 커밋하기 전에 충돌 할 수있는 기회가 있습니다.이 경우 소비자가 복구 할 때 커밋 된 메시지를 다시 읽습니다. 정확하게 한 번 시멘틱을 원한다면 처리 된 메시지를 원자 적으로 저장하는 것과 같은 다른 작업을해야합니다.

3) 언급 한대로 Lhfcws에는 "모든 메시지"와 같은 개념이 없습니다.

  • 빈 레코드 인 경우 폴링으로 반환되고 일부 구성된 횟수만큼 루프를 종료하고 종료 할 때 레코드 목록이 반환하는지 확인할 수 있습니다.
  • 메시지가 정렬 된 경우 (단일 파티션에서 읽는 경우) 특별한 END_OF_DATA 메시지를 보낼 수 있습니다. 메시지를 볼 때 소비자를 닫을 수 있습니다.
  • 소비자가 여러 메시지를 읽은 다음 종료하면 다음 커밋 된 마지막 오프셋에서 계속됩니다.
+0

감사합니다 Lhfcws와 Luciano.I 지금은 2 점과 3 점에 대해 명확합니다. 1 점에 대해, 나는 프로듀서 직후에 소비자를 실행하고 있습니다. 나는 소비자 그룹을 바꾸지 않을 것입니다. 저는 bin 유틸리티를 사용하여 주제를 만들지 않습니다. producer.send 코드가 주제를 생성한다고 가정합니다. bootstrap.servers = localhost : 9092 group.id = test enable.auto.commit = true – jena84

+1

jena84, 소비자 구성에서 auto.offset.reset을 "가장 초기"로 설정하고 다시 시도해보십시오. 또한, 소비자가 재조정을 완료하기를 기다린 후 시작합니다. –

+0

굉장! 작동 했어. 고마워. 너가 오프셋 관리에 대해 준 링크에서 언급 된 이유 때문인지. 나는 또한 "가장 작은"것을 시도했다. 그것은 나를 허락하지 않았다. 새로운 소비자 API 때문입니까? – jena84