2017-12-15 18 views
6

자바로 작성되고 Spark 2.1을 사용하는 Spark 스트리밍 앱이 있습니다. 카프카에서 메시지를 읽기 위해 KafkaUtils.createDirectStream을 사용하고 있습니다. kafka 메시지에 kryo 인코더/디코더를 사용하고 있습니다.
Spark이 메시지를 마이크로 일괄 처리로 가져 오면 메시지가 kryo 디코더를 사용하여 성공적으로 디코딩됩니다. 그러나 나는 스파크 집행자가 kafka에서 읽은 각 메시지를 해독하기 위해 kryo 디코더의 새로운 인스턴스를 생성한다는 사실을 알아 냈습니다. 디코더 생성자 안에 로그를 넣어서 확인했습니다.

이것은 나에게 이상한 것 같습니다. 각 메시지 및 각 배치에 동일한 디코더 인스턴스를 사용해야합니까?카프카 다이렉트 스트림이 모든 메시지에 대해 새로운 디코더를 만드는 이유는 무엇입니까?

코드 나는 카프카에서 읽고 있습니다 여기서

우리는 스파크가 내부적으로 카프카에서 데이터를 가져 오는 방법을 보려면
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

답변

3

, 우리는 모든 RDD 구현하는 방법 인 KafkaRDD.compute보고해야하는

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

이 여기서 중요한 것은 KafkaRDDIterator 생성 else 절입니다 : 잘, RDD 것을 계산하는 방법을 프레임 워크를 알려줍니다. 이 내부적으로 가지고 : 당신이 보는대로, 각 기본 파티션를 들어, 키 디코더 및 반사를 통해 값 디코더 모두의 인스턴스를 생성

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

합니다. 즉, 메시지가 생성되지 않습니다. 메시지 당 하지만 은 카프카 파티션입니다.

왜 이런 방식으로 구현 되나요? 나는 모른다. 키와 값 디코더가 Spark 내부에서 일어나는 다른 모든 할당과 비교할 때 무시할만한 성능을 발휘해야하기 때문에 나는 추측하고 있습니다.

앱을 프로파일 링하여이를 할당 핫 경로로 발견하면 문제를 열 ​​수 있습니다. 그렇지 않으면 걱정할 필요가 없습니다.

+0

매우 잘 조사되었습니다! #impressed –

+0

@Yuval : 카프카 0.10.x를 사용하고 있습니다. Spark은 캐시 키가 소비자 ID, 주제 ID, 파티션 ID로 식별되는 캐시 된 kafka 소비자 (실행 프로그램 별)를 사용합니다. 카프카 파티션 당 디코더를 갖는 방법이나 Spark가 병렬로 메시지를 디코딩하는 방법이 있습니다. 내가 예상하는 바는 새로운 디코더가 캐시 된 소비자 내부의 파티션마다 한 번 만들어 져야한다는 것이다. 가벼운로드에서는이 문제를 볼 수 없지만 초당 1000 개의 메시지를 보낼 때만이 문제가 발생합니다. 아마도 저는 "GC"사이클을 실행하고있을 것입니다. KafkaRDD 클래스에서 로깅을 활성화하는 방법에 대해 알고 있습니까? – scorpio

+0

@scorpio Kafka 0.10.x에는 디코더가 필요하지 않습니다. 그것은 기본'ConsumerRecord'를 리턴하고, 당신은 그것으로 무엇을 할 것인지를 선택합니다. 아마도'map' 내부에 디코더 인스턴스를 생성하고 있습니까? –