2017-12-15 19 views
0

나는 카프카 제작자를 만들고 다른 방법을 호출하는 아래에 언급 된 여러 모델을 가지고 있지만 흐름을 중단해서는 안되며 성능에 영향을 미치지 않도록 프로그램하는 올바른 방법이 무엇인지 확실하지 않습니다. 친절하게 도와주세요.카프카 프로듀서를 만들고 send(), flush() 및 close() 메서드를 호출 할 때 올바른 순서는 무엇입니까?

모델 1 :

for(int i=1; i < 100; i++){ 
    Producer<String, String> producer = new KafkaProducer<String, String>(props); 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
    producer.close(); 
} 

모델 2 :

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i=1; i < 100; i++) { 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
    producer.close(); 
} 

모델 3 :

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i=1; i < 100; i++){ 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
} 
producer.close(); 

모델 4 :

for(int i=1; i < 100; i++){ 
    Producer<String, String> producer = new KafkaProducer<String, String>(props); 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
    producer.flush(); 
    producer.close(); 
} 

모델 5 :

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i=1; i < 100; i++){ 
    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
    producer.flush(); 
    producer.close(); 
} 

모델 6 :

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i=1; i < 100; i++){ 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
     topicName, 
     String.valueOf(i) 
    ); 

    producer.send(data); 
    producer.flush(); 
} 
producer.close(); 
+0

* 그래서 흐름이 중단되지해야하며, 성능이 영향을받지 않습니다 * 어느 플러시 설정할 수 있습니다 우리입니다. 평가 기준에 가장 부합되는 제품을 찾으려고 노력해야합니다. –

+0

@ElliottFrisch 여기에 전시 한 모든 모델을 시도해 보았습니다. 정상적으로 작동하는 모델은 거의 없었으며 모델의 나머지 부분은 IllegalStateException을 반환했습니다. 그래서 의심의 여지가 명확 해 지도록 내 머리 속에 문제가 발생했습니다. 또한 제한된 양의 데이터 만 처리 할 수 ​​있기 때문에 원활하게 실행되는 모델의 성능 효과가 있는지 확실하지 않습니다. – mannedear

답변

2

모델 (3) 아래의 예를 사용하여 변화를

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
    try { 
     for (int i = 1; i < 100; i++) { 
      ProducerRecord<String, String> data = new ProducerRecord<String, String>(topicName, String.valueOf(i)); 
      producer.send(data); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 
     producer.close(); 
    } 
1

을 다음과 같이 정확해야 보인다

Properties props = new Properties(); 
props.put("batch.size", 16384); 
props.put("buffer.memory", 33554432); 
Producer<String, String> producer = new KafkaProducer<>(props); 
for (int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 

producer.close(); 

알려진대로, send() 메소드는 비동기식입니다. 호출되면 보류중인 레코드 전송 버퍼에 레코드를 추가하고 즉시 반환합니다. 이를 통해 생산자는 효율성을 위해 개별 레코드를 함께 배치 할 수 있습니다.

그리고 우리는 buffer.memory 또는 batch.size 자동