2017-10-01 4 views
1

카프카 0.10.2.1 클러스터를 사용하고 있습니다. 나는 특정 오프셋을 찾기 위해 Kafka의 offsetForTimes API를 사용하고 있으며 최종 타임 스탬프에 도달했을 때 루프를 브레이크 아웃하려고합니다.카프카는 타임 스탬프, 소비자 루프별로 레코드를 가져옵니다.

내 코드는 다음과 같다 :

  1. consumer.poll도 1000 밀리 초 실패 : 나는 다음과 같은 문제에 직면

    //package kafka.ex.test; 
    
    import java.util.*; 
    
    
    
    import org.apache.kafka.clients.consumer.KafkaConsumer; 
    import org.apache.kafka.clients.consumer.ConsumerRecords; 
    import org.apache.kafka.clients.consumer.ConsumerRecord; 
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 
    import org.apache.kafka.common.PartitionInfo; 
    import org.apache.kafka.common.TopicPartition; 
    
    public class ConsumerGroup { 
    
    
        public static OffsetAndTimestamp fetchOffsetByTime(KafkaConsumer<Long, String> consumer , TopicPartition partition , long startTime){ 
    
         Map<TopicPartition, Long> query = new HashMap<>(); 
         query.put(
           partition, 
           startTime); 
    
         final Map<TopicPartition, OffsetAndTimestamp> offsetResult = consumer.offsetsForTimes(query); 
         if(offsetResult == null || offsetResult.isEmpty()) { 
         System.out.println(" No Offset to Fetch "); 
         System.out.println(" Offset Size "+offsetResult.size()); 
    
    
    
         return null; 
         } 
         final OffsetAndTimestamp offsetTimestamp = offsetResult.get(partition); 
         if(offsetTimestamp == null){ 
         System.out.println("No Offset Found for partition : "+partition.partition()); 
         } 
         return offsetTimestamp; 
        } 
    
        public static KafkaConsumer<Long, String> assignOffsetToConsumer(KafkaConsumer<Long, String> consumer, String topic , long startTime){ 
         final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); 
         System.out.println("Number of Partitions : "+partitionInfoList.size()); 
         final List<TopicPartition> topicPartitions = new ArrayList<>(); 
         for (PartitionInfo pInfo : partitionInfoList) { 
         TopicPartition partition = new TopicPartition(topic, pInfo.partition()); 
         topicPartitions.add(partition); 
         } 
         consumer.assign(topicPartitions); 
         for(TopicPartition partition : topicPartitions){ 
         OffsetAndTimestamp offSetTs = fetchOffsetByTime(consumer, partition, startTime); 
    
    
         if(offSetTs == null){ 
          System.out.println("No Offset Found for partition : " + partition.partition()); 
          consumer.seekToEnd(Arrays.asList(partition)); 
         }else { 
          System.out.println(" Offset Found for partition : " +offSetTs.offset()+" " +partition.partition()); 
          System.out.println("FETCH offset success"+ 
            " Offset " + offSetTs.offset() + 
            " offSetTs " + offSetTs); 
          consumer.seek(partition, offSetTs.offset()); 
         } 
         } 
         return consumer; 
        } 
    
        public static void main(String[] args) throws Exception { 
    
    
         String topic = args[0].toString(); 
         String group = args[1].toString(); 
    
         long start_time_Stamp = Long.parseLong(args[3].toString()); 
         String bootstrapServers = args[2].toString(); 
         long end_time_Stamp = Long.parseLong(args[4].toString()); 
         Properties props = new Properties(); 
         boolean reachedEnd = false; 
    
         props.put("bootstrap.servers", bootstrapServers); 
         props.put("group.id", group); 
         props.put("enable.auto.commit", "true"); 
         props.put("auto.commit.interval.ms", "1000"); 
         props.put("session.timeout.ms", "30000"); 
         props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer"); 
         props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer"); 
    
         KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props); 
         assignOffsetToConsumer(consumer, topic, start_time_Stamp); 
    
    
         System.out.println("Subscribed to topic " + topic); 
         int i = 0; 
    
         int arr[] = {0,0,0,0,0}; 
         while (true) { 
         ConsumerRecords<Long, String> records = consumer.poll(6000); 
         int count= 0; 
         long lasttimestamp = 0; 
         long lastOffset = 0; 
          for (ConsumerRecord<Long, String> record : records) { 
    
           count++; 
    
           if(arr[record.partition()] == 0){ 
            arr[record.partition()] =1; 
           } 
    
    
           if (record.timestamp() >= end_time_Stamp) { 
            reachedEnd = true; 
            break; 
           } 
    
    
           System.out.println("record=>"+" offset=" 
             +record.offset() 
             + " timestamp="+record.timestamp() 
             + " :"+record); 
           System.out.println("recordcount = "+count+" bitmap"+Arrays.toString(arr)); 
    
          } 
    
         if (reachedEnd) break; 
         if (records == null || records.isEmpty()) break; // dont wait for records 
         } 
    
        } 
    
    
    
    } 
    

    . 내가 1000 밀리 세컨드를 사용한다면 루프에서 몇 번 폴링해야만했다. 나는 지금 매우 큰 가치가있다. 그러나 이미 파티션에서 관련 오프셋을 찾았습니까? 폴링 시간 초과를 안정적으로 설정하여 데이터가 즉시 반환되도록하는 방법은 무엇입니까?

  2. 데이터가 반환 될 때 항상 모든 파티션에서 나오는 것은 아닙니다. 모든 파티션에서 데이터가 반환되는 경우에도 모든 레코드가 반환되는 것은 아닙니다. 주제에있는 레코드의 양은 1000 개 이상입니다. 그러나 루프에서 실제로 페치되고 인쇄되는 레코드의 양은 적습니다 (~ 200). 현재 카프카 API 사용에 문제가 있습니까?

방법 확실 루프 탈옥

하지 일찍 시작과 끝 타임 스탬프 사이의 모든 데이터를 획득하는 데? 여론 조사 당 가져온 기록

답변

1
  1. 금액은 소비자 설정

  2. 파티션 중 하나는 당신이 원하는되지 않습니다 endtimestamp에 도달 할 때 루프를 깨고에 따라 달라집니다. 폴링 루프를 종료하기 전에 모든 파티션을 종료해야하는지 확인해야합니다.

  3. 폴링 호출은 비동기 호출이며 페치 요청 및 응답은 노드별로 있으므로 폴링에서 모든 응답을 얻을 수도 있고받지 않을 수도 있습니다 중개인 응답 시간