2016-11-21 4 views
0

파일에서 카프카 항목으로 데이터를 쓰려고합니다. 내 코드는 다음과 같습니다.Kafka 제작자가 메시지를 건너 뛰는 중

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", <bootstrapServers>); 
    properties.put("key.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("value.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("retries",100); 
    properties.put("linger.ms",5); 
    properties.put("acks", "all"); 

    KafkaProducer<Object, String> producer = new KafkaProducer<>(properties); 

    try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { 
     String line; 
     int count = 0; 
     while ((line = bf.readLine()) != null) { 
      count++; 
      producer.send(new ProducerRecord<>(topicName, line)); 
     } 
    producer.flush(); 
     Logger.log("Done producing data messages. Total no of records produced:" + count); 
    } catch (InterruptedException | ExecutionException | IOException e) { 
     Throwables.propagate(e); 
    } finally { 
     producer.close(); 
    } 

데이터 크기가 1 백만 레코드를 초과합니다. 위의 명령의

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name> 

출력 :

내가이 다음 명령을 사용하여 브로커에 대한 데이터의 오프셋을 확인할 때 (5,00,000 주위) 항목에 기록되는 메시지의 절반 만이있다

topic_name:1:292954 
topic_name:0:296787 

접근 방식을 변경하면 모든 항목이 주제에 쓰여 있는지 확인해야합니다.

+0

GetOffsetShell 명령의 실제 출력을 표시 할 수 있습니까? – C4stor

+0

질문에 출력을 추가했습니다. –

+0

응용 프로그램 로그 파일의 count 값은 무엇입니까? 1m가 보이나요? – notionquest

답변

0

보내기 메시지가 비동기입니다. 모든 메시지가 처리되기 전에 오프셋을 확인할 수 있습니다.

+0

로그 보존은 24 시간입니다. 그리고 거의 4-5 분이 소요되는 메시지를 생성 한 직후 메시지를 확인하고 있습니다. –

+0

완전히 대답을 변경했습니다. –

+0

그래, 내 생각은 나쁘다고 생각하고 유지가 문제가되지 않을 것임을 깨달았습니다. 변경하기 전에 새로 고침하는 것을 잊었습니다. –