0

에 알 수없는 마법 바이트는 카프카 스트림과 브로 데이터를 스트리밍 할 때,이 오류를 건너 왔어요 거기에 언급 된 해결책 중 어느 것도 문제를 해결하지 못했습니다. 바라건대 여기서 해결책을 찾을 수 있습니다. 다음과 같이카프카 스트림은 GenericAvroSerde

내 설정은 같습니다
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)

이미 메일 링의 솔루션으로 VALUE_SERDE 같은에 KEY_SERDE를 설정했지만, 심지어이 비록 "표시"했다 목록에서, 그것은 내 경우에는 작동하지 않았다. 다음과 같이

내 스키마와 GenericData.Record 발생 해요 :
val record = new GenericData.Record(schema) ... record.put(field, value)

내가 디버그 모드를 시작하고 생성 된 기록을 확인할 때, 모든 것이 잘 보인다는 레코드의 데이터가 및 매핑이 올바른 것입니다.

나는이 같은 KStream이 (내가 전에 분기를 사용) 스트리밍 : 나는 기록을 위해 GenericData.Record을 사용하고
splitTopics.get(0).to(s"${destTopic}_Testing")

. GenericAvroSerde과 함께이 문제가 발생할 수 있습니까?

고맙습니다.
모든 새해 복 많이 받으세요!

+0

전반적인 설정은 무엇입니까? 단일 주제? 콘솔 소비자로 데이터를 읽을 수 있습니까? 이 예제가 도움이 될 수도 있습니다. https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java –

+0

입력 항목에 있습니다. 원시 텍스트 (Kafka Streams에서 파싱 됨). 그리고 나서'GenericData.Record' 레코드에 매핑됩니다. 출력 주제에는 Avro 데이터가 있습니다. 콘솔 사용자와 함께 입력 항목의 메시지를 읽을 수 있습니다. 디버그 모드에서 시작하면,'streams.to' 호출이 Avro 레코드로 전송되기 전에 레코드를 볼 수 있습니다. 단일 입력 항목이지만 여러 출력 항목 (순간 4)입니다. 출력 항목이 아직 존재하지 않습니다 –

+0

매직 바이트는 합류하는 클라이언트가 Avro를 사용하여 일련 화 된 메시지 앞에 마커의 일종으로 추가 한 바이트입니다.이 오류는 Confluent 클라이언트에서 일부 메시지를 deserialize하려고 시도했지만 Confluent 클라이언트에서 Avro를 사용하여 메시지를 직렬화하지 않았을 때, 합류하고 바닐라 카프카 클라이언트를 혼합하고 있습니까? –

답변

0

입력 문제에서 얻은 String 값을 deserialize 한 후 VALUE_SERDE을 교환하는 것이 해결책입니다. Serde 이후

는 직렬화 및 역 직렬화의 결합 "요소"입니다, 나는 단순히 StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]를 사용하지만 출력 항목에 쓰는하기 위해 AvroSerde를 사용하는 경우에만 다음 입력 레코드를 직렬화 복원 및 대한 StringSerde를 사용할 필요가 없습니다.
는 이제 다음과 같습니다

// default streams configuration serdes are different from the actual output configurations 
val streamsConfiguration: Properties = { 
    val p = new Properties() 
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID")) 
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG")) 
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG")) 
    p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
    p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
    p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG")) 
    p 
} 

// adjusted output serdes for avro records 
val keySerde: Serde[String] = Serdes.String 
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde() 
valSerde.configure(
    Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
    streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG) 
), 
    /* isKeySerde = */ false 
) 

// Now using the adjusted serdes to write to output like this 
stream.to(keySerde, valSerde, "destTopic") 

이 방법은, 그것은 매력처럼 작동합니다.
감사합니다.