2
특정 형식을 사용하여 Avro Serializer와 함께 Apache Kafka를 사용하고 있습니다. 내 자신의 사용자 지정 클래스를 만들고 카프카 메시지 값으로 사용하려고합니다.Kafka Avro 시리얼 화기 : org.apache.avro.AvroRuntimeException : 열지 않음
Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
내 브로 스키마는 다음과 같이 파일 :
{
"namespace": "customer.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": "string"
}]
}
고객 등급 :
public class Customer {
public int id;
public String name;
public Customer() {
}
public Customer(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
데이터 직렬화가 아 브로를 사용하여 내가 메시지를 보내려고하고 있어요 그러나 나는 다음과 같은 예외를 얻고있다 :
public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
kafkaProducer.send(record);
}
Customer customer1 = new Customer(1001, "James");
Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();
ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
"Customer One", dataFileWriter
);
fireAndForget(record1);
일반적인 하나 대신 SpecificDatumWriter
라이터를 사용하고 싶습니다. 이 오류는 무엇과 관련이 있습니까?
헤이 @Yuval이 솔루션에 따라, 나는 점점 오전 : 당신이해야 할 일은
는
BinaryEncoder
및ByteArrayOutputStream
비아 직렬화 브로와 바이트 배열을 만든 다음ProducerRecord<String, byte[]>
에 전달입니다 '스레드 "main"예외 java.lang.ClassCastException : com.harmeetsingh13.java.producers.avroserializer.Customer는 org.apache.avro.generic.IndexedRecord' 예외로 캐스팅 될 수 없습니다. 예외 –@HarmeetSinghTaara 글쎄, 그건 또 다른 문제입니다. 당신은'Customer'의 정의와 전체 스택 추적을 포함하는 다른 질문을 게시합니다. 직렬화하려는 유형에 필요한'ISpecificRecord' 인터페이스를 구현하지 않는 것 같습니다. 그렇지 않으면 대신'GenericDatumWriter'를 사용할 수 있습니다. –
오케이 @Yuval,하지만이 솔루션에 따르면 우리는 데이터를 바이트로 변환해야합니다. 그 다음에 우리는 kafka에게 메시지를 보냅니다. 우리 스키마를 avro에 지정할 수 없으며 avro가 자동으로 모든 것을 처리합니까? –