2017-02-10 11 views
2

특정 형식을 사용하여 Avro Serializer와 함께 Apache Kafka를 사용하고 있습니다. 내 자신의 사용자 지정 클래스를 만들고 카프카 메시지 값으로 사용하려고합니다.Kafka Avro 시리얼 화기 : org.apache.avro.AvroRuntimeException : 열지 않음

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open 
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82) 
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287) 
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

내 브로 스키마는 다음과 같이 파일 :

{ 
    "namespace": "customer.avro", 
    "type": "record", 
    "name": "Customer", 
    "fields": [{ 
     "name": "id", 
     "type": "int" 
    }, { 
     "name": "name", 
     "type": "string" 
    }] 
} 

고객 등급 :

public class Customer { 
    public int id; 
    public String name; 

    public Customer() { 
    } 

    public Customer(int id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 
} 

데이터 직렬화가 아 브로를 사용하여 내가 메시지를 보내려고하고 있어요 그러나 나는 다음과 같은 예외를 얻고있다 :

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) { 
     kafkaProducer.send(record); 
    } 

Customer customer1 = new Customer(1001, "James"); 

Parser parser = new Parser(); 
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro")); 

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer); 
dataFileWriter.append(customer1); 
dataFileWriter.close(); 

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry", 
     "Customer One", dataFileWriter 
); 
fireAndForget(record1); 

일반적인 하나 대신 SpecificDatumWriter 라이터를 사용하고 싶습니다. 이 오류는 무엇과 관련이 있습니까?

답변

1

카프카가 serialize 할 키 값 쌍을 받으면 serialize하려는 값이 아닌 DataFileWriter을 전달합니다.이 값은 작동하지 않습니다.

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
ByteArrayOutputStream os = new ByteArrayOutputStream(); 

try { 
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
    writer.write(customer1, encoder); 
    e.flush(); 

    byte[] avroBytes = os.toByteArray(); 
    ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

    kafkaProducer.send(record1); 
} finally { 
    os.close(); 
} 
+0

헤이 @Yuval이 솔루션에 따라, 나는 점점 오전 : 당신이해야 할 일은

BinaryEncoderByteArrayOutputStream 비아 직렬화 브로와 바이트 배열을 만든 다음 ProducerRecord<String, byte[]>에 전달입니다 '스레드 "main"예외 java.lang.ClassCastException : com.harmeetsingh13.java.producers.avroserializer.Customer는 org.apache.avro.generic.IndexedRecord' 예외로 캐스팅 될 수 없습니다. 예외 –

+0

@HarmeetSinghTaara 글쎄, 그건 또 다른 문제입니다. 당신은'Customer'의 정의와 전체 스택 추적을 포함하는 다른 질문을 게시합니다. 직렬화하려는 유형에 필요한'ISpecificRecord' 인터페이스를 구현하지 않는 것 같습니다. 그렇지 않으면 대신'GenericDatumWriter'를 사용할 수 있습니다. –

+0

오케이 @Yuval,하지만이 솔루션에 따르면 우리는 데이터를 바이트로 변환해야합니다. 그 다음에 우리는 kafka에게 메시지를 보냅니다. 우리 스키마를 avro에 지정할 수 없으며 avro가 자동으로 모든 것을 처리합니까? –