1
나는 카프카에 새로 온 사람과 프로토 타입에 작업하는 카프카에 독점 스트리밍 서비스를 연결하는 데에 보낸 마지막 메시지 얻기. 는 카프카 주제
나는 우리의 사내 스트림 소비자가 연결할 때받은 마지막 메시지의 ID로 로그온 할 필요가 같은 주제에 보낸 마지막 메시지의 키를 얻기 위해 찾고 있어요. 는이 작업을 수행 할 KafkaProducer 또는 KafkaConsumer을 사용하여, 수 있습니까?나는 소비자를 사용하여 다음을 수행하려고 시도했지만, 또한 콘솔 소비자를 실행할 때 나는 재생 메시지를 참조하십시오.
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
예상되는 동작입니까 아니면 잘못된 경로입니까? link api가 committed
방법은 주어진 파티션, 즉 대한 the last committed offset
를 얻는 것입니다 의미로
, 난 내가 스파크 프로그램에서 같은 일을 달성하기 위해 노력하고 있지만, 할 수 있지 않다, Consumer.position에게 대신 – ryanpudd
@ryanpudd을 확인해야합니다. 전체 코드에 대한 링크를 제공해 주시겠습니까? 감사 –