1

스키마를 만들고 프로듀서가 kafka로 보내려면 다음 코드를 사용합니다 (실제로는 아니지만 가정합니다).카프카에서 avro 스키마를 한 번만 보내는 방법

public static final String USER_SCHEMA = "{" 
     + "\"type\":\"record\"," 
     + "\"name\":\"myrecord\"," 
     + "\"fields\":[" 
     + " { \"name\":\"str1\", \"type\":\"string\" }," 
     + " { \"name\":\"str2\", \"type\":\"string\" }," 
     + " { \"name\":\"int1\", \"type\":\"int\" }" 
     + "]}"; 

public static void main(String[] args) throws InterruptedException { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 

    Schema.Parser parser = new Schema.Parser(); 
    Schema schema = parser.parse(USER_SCHEMA); 
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); 

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); 

    for (int i = 0; i < 1000; i++) { 
     GenericData.Record avroRecord = new GenericData.Record(schema); 
     avroRecord.put("str1", "Str 1-" + i); 
     avroRecord.put("str2", "Str 2-" + i); 
     avroRecord.put("int1", i); 

     byte[] bytes = recordInjection.apply(avroRecord); 

     ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes); 
     producer.send(record); 

     Thread.sleep(250); 

    } 

    producer.close(); 
} 

이 코드를 사용하면이 스키마가있는 메시지를 1 개만 보낼 수 있습니다. 그런 다음 스키마 이름을 변경하여 다음 메시지를 보내야합니다 ... 그래서 이름 문자열이 무작위로 생성되어 더 많은 메시지를 보낼 수 있습니다. 이것은 해킹이므로 적절한 방법을 알고 싶습니다.

나는 또한 스키마가없는 메시지를 보내는 방법을 살펴 봤다. (즉, 스키마와 함께 kafka에 이미 1 개의 메시지가 보내 졌으므로 이제 다른 모든 메시지에는 스키마가 필요하지 않음) - new GenericData.Record(..)은 스키마 매개 변수를 필요로한다. null의 경우, 에러가 슬로우됩니다.

카프카에게 아볼로 스키마 메시지를 보내는 올바른 방법은 무엇입니까? 여기

는 다른 코드 샘플입니다 - 내 꽤 동일 :
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java

또한 스키마를 설정하지 않고 보내는 방법을 표시하지 않습니다.

답변

1

내가 선 이해하지 못했다 : 예 모두에서

The thing is the code allows me to send only 1 message with this schema. Then I need to change the schema name in order to send the next message.

, 당신과 당신이 공급 합류 예, 스키마는 카프카에 전송되지 않습니다.

제공된 예제에서는 GenericRecord 객체를 만드는 데 사용 된 스키마입니다. 일부 스키마 (예를 들어 정수 int1 필드를 GenericRecord 객체 내에 만 넣을 수 있는지 확인)와 비교하여 레코드의 유효성을 검사하기 때문에 스키마를 제공합니다.

유일한 차이점은 데이터를 byte []로 직렬화하기로 결정했기 때문입니다.이 작업은 합법적 인 예에서 볼 수 있듯이이 책임을 KafkaAvroSerializer에 위임 할 수 있으므로 필요하지 않을 수 있습니다.

GenericRecord는 Avro 개체이며 Kafka의 시행이 아닙니다. 어떤 종류의 객체를 카프카에 보내고 싶다면 (또는 스키마없이) 카피 카 객체를 byte []로 변환하고 생성자가 생성하고자하는 프로퍼티에이 serializer를 설정하면된다. 생산자.

일반적으로 Avro 메시지 자체를 사용하여 스키마에 대한 포인터를 보내는 것이 좋습니다. 다음 링크에서 추론을 찾을 수 있습니다 : http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/

+0

감사합니다. 나는 카프카에서 hdfs로 편지를 쓰고 싶습니다. 내가 봤던 것에서는 카프카에 avro 형식의 데이터가 있어야합니다. 그렇지 않으면 hdfs는 아무 것도 쓰지 않습니다. 이제는 카프카에 avro 형식으로 데이터를 쓰려고합니다. 한 번 스키마를 등록 (또는 전혀)하지 않고 메시지를 보내려고합니다. 이 샘플 코드가 있습니까? 카프카에 데이터를 보내기 전에 먼저 스키마 서버에 스키마를 등록해야합니까? 위의 코드와 같이 메시지에 스키마를 추가하지 않고 avro 형식의 카프카에 데이터를 보낼 수 있습니까? – Adrian

+0

메시지를 보낼 때마다 스키마 이름을 변경해야한다는 점을 잊어 버리십시오. 그렇지 않으면 오류가 발생합니다 : 'Can not redefine : myrecord'. 이거 버그 야? – Adrian

+0

HDFS는 파일 시스템 일뿐입니다. HDFS에 데이터를 쓰는 방법과 데이터를 선택하는 방법을 선택하십시오. 카프카에서 HDFS로 데이터를 어떻게 이동시킬 계획입니까? 당신은 그것을 책임질 프로세스가 필요합니다. 사용하려고 계획 한 프로세스가 Avro의 사용을 강요하지만 HDFS의 전제 조건은 아닙니다. 다양한 형식으로 데이터를 쓸 수 있습니다. 문자열을 Kafka에 쓰는 예제를 볼 수 있습니다. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 나중에 해당 문자열을 소비자에게 쓰고 HDFS API를 사용하는 HDFS. –