자바로 작성되고 Spark 2.1을 사용하는 Spark 스트리밍 앱이 있습니다. 카프카에서 메시지를 읽기 위해 KafkaUtils.createDirectStream
을 사용하고 있습니다. kafka 메시지에 kryo 인코더/디코더를 사용하고 있습니다.
Spark이 메시지를 마이크로 일괄 처리로 가져 오면 메시지가 kryo 디코더를 사용하여 성공적으로 디코딩됩니다. 그러나 나는 스파크 집행자가 kafka에서 읽은 각 메시지를 해독하기 위해 kryo 디코더의 새로운 인스턴스를 생성한다는 사실을 알아 냈습니다. 디코더 생성자 안에 로그를 넣어서 확인했습니다.
이것은 나에게 이상한 것 같습니다. 각 메시지 및 각 배치에 동일한 디코더 인스턴스를 사용해야합니까?카프카 다이렉트 스트림이 모든 메시지에 대해 새로운 디코더를 만드는 이유는 무엇입니까?
코드 나는 카프카에서 읽고 있습니다 여기서
우리는 스파크가 내부적으로 카프카에서 데이터를 가져 오는 방법을 보려면JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
매우 잘 조사되었습니다! #impressed –
@Yuval : 카프카 0.10.x를 사용하고 있습니다. Spark은 캐시 키가 소비자 ID, 주제 ID, 파티션 ID로 식별되는 캐시 된 kafka 소비자 (실행 프로그램 별)를 사용합니다. 카프카 파티션 당 디코더를 갖는 방법이나 Spark가 병렬로 메시지를 디코딩하는 방법이 있습니다. 내가 예상하는 바는 새로운 디코더가 캐시 된 소비자 내부의 파티션마다 한 번 만들어 져야한다는 것이다. 가벼운로드에서는이 문제를 볼 수 없지만 초당 1000 개의 메시지를 보낼 때만이 문제가 발생합니다. 아마도 저는 "GC"사이클을 실행하고있을 것입니다. KafkaRDD 클래스에서 로깅을 활성화하는 방법에 대해 알고 있습니까? – scorpio
@scorpio Kafka 0.10.x에는 디코더가 필요하지 않습니다. 그것은 기본'ConsumerRecord'를 리턴하고, 당신은 그것으로 무엇을 할 것인지를 선택합니다. 아마도'map' 내부에 디코더 인스턴스를 생성하고 있습니까? –