2017-10-05 6 views
1

저는 Schema Registry를 사용하여 avro 토픽에서 데이터를 읽고 간단한 변환을 수행하고 결과를 콘솔에 인쇄하는 Java 애플리케이션을 개발했습니다. 기본적으로 키와 값에 대해 GenericAvroSerde 클래스를 사용했습니다. 모든 것은 내가 그없이Kafka Streams가 스키마없이 avro 토픽을 생성했습니다.

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url")); 
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde(); 
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde(); 
    keyGenericAvroSerde.configure(serdeConfig, true); 
    valueGenericAvroSerde.configure(serdeConfig, false); 

처럼 각 serde에 대한 추가 구성을 정의해야한다고 항상 같은 오류 얻을 제외하고 잘 작동 :

Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940 
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69 
Caused by: java.lang.NullPointerException 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 

글쎄, 그것은 그 후, unsual하지만 괜찮다고 (위에 게시 한대로 구성 호출을 추가했을 때) - 작동하고 응용 프로그램이 모든 작업을 수행하고 결과를 인쇄 할 수있었습니다.

하지만! 새로운 주제에 데이터를 게시하기 위해 call through()를 사용하려고했을 때 내가 묻는 문제에 직면했습니다 : 주제가 스키마없이 만들어졌습니다. 어떻게 될 수 있습니까 ???

재미있는 사실은 데이터가 기록되고 있음을, 그러나입니다 가) 바이너리 형식으로, 너무 간단 소비자가 B를 읽을 수 있습니다) 그렇지 스키마가 - 그래서 아 브로 소비자가 그 중 하나를 읽을 수 있습니다 : 물론

Processed a total of 1 messages 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 

나는 주제에 대한 스키마 레지스트리를 체크 아웃 :

curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions 
{"error_code":40401,"message":"Subject not found."} 

그러나 자바 응용 프로그램에 의해 작성된 다른 주제에 동일한 호출 - 초기 데이터의 생산자가 스키마가 존재한다는 것을 보여줍니다

0 주제는 단순 소비자로 읽을 수 있습니다, 데이터가 기록 생성되지만 이진 스키마가 존재하지 않습니다 -
curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions 
[1] 

두 응용 프로그램은 단지 요약 동일한 "schema.registry.url"구성 를 사용합니다.

또한 데이터를 일치시키기 위해 어떻게 든 Landoop을 사용하여 스키마를 만들려고했지만 성공하지 못했습니다. 그런데 카프카 스트림을 사용하는 적절한 방법이 아닙니다. 모든 것을 즉시 수행해야합니다.

도움말, 제발!

+0

어떤 버전을 사용하십니까? 또한 기본값으로'StreamsConfig'에 AvroSerde를 설정하거나 각 운영자에게 개별적으로 설정합니까? 주제를 수동으로 작성하고 응용 프로그램을 시작하기 전에 작성 했습니까? 또한이 예제를 확인하십시오 : https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java#L83- L85 –

+0

Confluent 3.3.0, Java 1.8, kafka 0.11.0.0-cp1, avro 버전 1.7.7을 사용합니다. GenericAvroSerde를 기본값으로 설정하지만, 간단한 유형의 경우에는이 설정 (Serdes.Long, Serdes.String, Serdes.Float)을 무시합니다. 내가 사용하고자하는 주제는 존재하지 않지만, 시작 부분에 쓴 것처럼 데이터가 작성되는 동안 작성되었습니다. –

+0

그리고이 예제에 관해서는 - 예제의 키가 bytearray로 디코딩되는 것을 제외하고는 똑같습니다. 왜냐하면 (왜냐하면 키의 avro 스키마는 단지 "string"이기 때문에) stringSerde를 사용하는 동안. 스키마 regstry url은 kafka 스트림 앱의 초기 데이터를 읽을 수 없다면 사용할 수 있습니다.Final Stream.print() –

답변

0

through이 호출되면 사용자가 특별히 무시하지 않는 한 StreamsConfig을 통해 정의 된 기본 serde가 사용됩니다. 어떤 기본 serde를 사용 했습니까? 당신이 자동으로 주제를 통해 스키마를 등록 할 AbstractKafkaAvroSerializer를 사용해야합니다.