2017-05-12 4 views
1

화제의 최신 메시지에서 시작되는 카프카 소비자를 갖고 싶습니다. auto.offset.reset의 값이 최신이지만, 소비자 2 일 전에 속하는 형태로 메시지를 시작하고 다음을 잡는다 있지만카프카 소비자가 최신 메시지부터 시작하지 않음

private static Properties properties = new Properties(); 
private static KafkaConsumer<String, String> consumer; 
static 
{ 
    properties.setProperty("bootstrap.servers","localhost"); 
    properties.setProperty("enable.auto.commit", "true"); 
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    properties.setProperty("group.id", "test"); 
    properties.setProperty("auto.offset.reset", "latest"); 
    consumer = new KafkaConsumer<>(properties); 

    consumer.subscribe(Collections.singletonList("mytopic")); 
} 

@Override 
public StreamHandler call() throws Exception 
{ 
    while (true) 
    { 
     ConsumerRecords<String, String> consumerRecords = consumer.poll(200); 
     Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic"); 
     for(ConsumerRecord<String, String> rec : records) 
     { 
      System.out.println(rec.value()); 
     } 
    } 
} 

: 여기

는 자바 코드 최신 메시지.

무엇이 누락 되었습니까?

답변

3

같은 group.id으로 이전이 동일한 코드를 실행할 수 있나요? auto.offset.reset 매개 변수는 이미 소비자 용으로 저장된 기존 오프셋이없는 경우에만 사용됩니다. 따라서 이전에 예제를 실행하고 2 일 전에 말한 다음 다시 실행하면 마지막으로 사용한 위치에서 시작합니다.

수동으로 주제 끝으로 이동하려면 seekToEnd()을 사용하십시오.

좀 더 철저하게 토론하려면 https://stackoverflow.com/a/32392174/1392894을 참조하십시오.

+0

감사합니다. 나는 당신이 옳다고 생각합니다! 나는 그것을 2 일 전에 사용 했었고 최근 offset에 의해 group.id가 소비 된 최신 오프셋을 의미한다는 것을 알지 못했습니다. – Ehsan