화제의 최신 메시지에서 시작되는 카프카 소비자를 갖고 싶습니다. auto.offset.reset의 값이 최신이지만, 소비자 2 일 전에 속하는 형태로 메시지를 시작하고 다음을 잡는다 있지만카프카 소비자가 최신 메시지부터 시작하지 않음
private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
properties.setProperty("bootstrap.servers","localhost");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("mytopic"));
}
@Override
public StreamHandler call() throws Exception
{
while (true)
{
ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
for(ConsumerRecord<String, String> rec : records)
{
System.out.println(rec.value());
}
}
}
: 여기
는 자바 코드 최신 메시지.무엇이 누락 되었습니까?
감사합니다. 나는 당신이 옳다고 생각합니다! 나는 그것을 2 일 전에 사용 했었고 최근 offset에 의해 group.id가 소비 된 최신 오프셋을 의미한다는 것을 알지 못했습니다. – Ehsan