스키마를 만들고 프로듀서가 kafka로 보내려면 다음 코드를 사용합니다 (실제로는 아니지만 가정합니다).카프카에서 avro 스키마를 한 번만 보내는 방법
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
이 코드를 사용하면이 스키마가있는 메시지를 1 개만 보낼 수 있습니다. 그런 다음 스키마 이름을 변경하여 다음 메시지를 보내야합니다 ... 그래서 이름 문자열이 무작위로 생성되어 더 많은 메시지를 보낼 수 있습니다. 이것은 해킹이므로 적절한 방법을 알고 싶습니다.
나는 또한 스키마가없는 메시지를 보내는 방법을 살펴 봤다. (즉, 스키마와 함께 kafka에 이미 1 개의 메시지가 보내 졌으므로 이제 다른 모든 메시지에는 스키마가 필요하지 않음) - new GenericData.Record(..)
은 스키마 매개 변수를 필요로한다. null의 경우, 에러가 슬로우됩니다.
카프카에게 아볼로 스키마 메시지를 보내는 올바른 방법은 무엇입니까? 여기
는 다른 코드 샘플입니다 - 내 꽤 동일 :https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java
또한 스키마를 설정하지 않고 보내는 방법을 표시하지 않습니다.
감사합니다. 나는 카프카에서 hdfs로 편지를 쓰고 싶습니다. 내가 봤던 것에서는 카프카에 avro 형식의 데이터가 있어야합니다. 그렇지 않으면 hdfs는 아무 것도 쓰지 않습니다. 이제는 카프카에 avro 형식으로 데이터를 쓰려고합니다. 한 번 스키마를 등록 (또는 전혀)하지 않고 메시지를 보내려고합니다. 이 샘플 코드가 있습니까? 카프카에 데이터를 보내기 전에 먼저 스키마 서버에 스키마를 등록해야합니까? 위의 코드와 같이 메시지에 스키마를 추가하지 않고 avro 형식의 카프카에 데이터를 보낼 수 있습니까? – Adrian
메시지를 보낼 때마다 스키마 이름을 변경해야한다는 점을 잊어 버리십시오. 그렇지 않으면 오류가 발생합니다 : 'Can not redefine : myrecord'. 이거 버그 야? – Adrian
HDFS는 파일 시스템 일뿐입니다. HDFS에 데이터를 쓰는 방법과 데이터를 선택하는 방법을 선택하십시오. 카프카에서 HDFS로 데이터를 어떻게 이동시킬 계획입니까? 당신은 그것을 책임질 프로세스가 필요합니다. 사용하려고 계획 한 프로세스가 Avro의 사용을 강요하지만 HDFS의 전제 조건은 아닙니다. 다양한 형식으로 데이터를 쓸 수 있습니다. 문자열을 Kafka에 쓰는 예제를 볼 수 있습니다. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 나중에 해당 문자열을 소비자에게 쓰고 HDFS API를 사용하는 HDFS. –