2017-11-07 15 views
1

나는 카프카에 새로 온 사람과 프로토 타입에 작업하는 카프카에 독점 스트리밍 서비스를 연결하는 데에 보낸 마지막 메시지 얻기. 는 카프카 주제

나는 우리의 사내 스트림 소비자가 연결할 때받은 마지막 메시지의 ID로 로그온 할 필요가 같은 주제에 보낸 마지막 메시지의 키를 얻기 위해 찾고 있어요.

는이 작업을 수행 할 KafkaProducer 또는 KafkaConsumer을 사용하여, 수 있습니까?

나는 소비자를 사용하여 다음을 수행하려고 시도했지만, 또한 콘솔 소비자를 실행할 때 나는 재생 메시지를 참조하십시오.

// Poll so we know we're connected 
    consumer.poll(100); 
    // Get the assigned partitions 
    Set<TopicPartition> assignedPartitions = consumer.assignment(); 
    // Seek to the end of those partitions 
    consumer.seekToEnd(assignedPartitions); 

    for(TopicPartition partition : assignedPartitions) { 
     final long offset = consumer.committed(partition).offset(); 
     // Seek to the previous message 
     consumer.seek(partition,offset - 1); 
    } 

    // Now get the last message 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     lastKey = record.key(); 
    } 
    consumer.close(); 

예상되는 동작입니까 아니면 잘못된 경로입니까? link apicommitted 방법은 주어진 파티션, 즉 대한 the last committed offset를 얻는 것입니다 의미로

답변

1

문제는 라인 final long offset = consumer.committed(partition).offset()에 : 마지막은 이미 읽은 당신의 소비자 TELL의 카프카 서버를 오프셋. 당신은 항상 오프셋 특정 읽기 때문에 은 그래서, 확실히 당신은 messages replayed을 가지고 있습니다. 제가 생각하기에 블록의 첫 번째 부분 만 제거하면됩니다. 물론

+0

, 난 내가 스파크 프로그램에서 같은 일을 달성하기 위해 노력하고 있지만, 할 수 있지 않다, Consumer.position에게 대신 – ryanpudd

+0

@ryanpudd을 확인해야합니다. 전체 코드에 대한 링크를 제공해 주시겠습니까? 감사 –