6

나는 Kafka에서 데이터 스트림을로드하기 위해 DataFrame/Dataset API를 기반으로하는 Spark-Streaming을 사용하여 structured streaming approach을 사용하려고합니다.구조화 된 스트리밍을 사용하여 Kafka에서 JSON 형식으로 레코드를 읽는 방법?

내가 사용

  • 스파크 2.10
  • 카프카 0.10
  • 스파크-SQL-카프카-0-10

스파크 카프카 데이터 소스에 정의되어 기본 스키마 :

|key|value|topic|partition|offset|timestamp|timestampType| 

내 데이터는 json 형식으로 제공됩니다. 열에 저장됩니다. 값을 열에서 기본 스키마를 추출하고 수신 된 데이터 프레임을 에 저장된 열로 업데이트하는 방법을 찾고 있습니까? 나는 아래의 방법을 시도했지만 작동하지 않습니다

여기에 내가 때문에 스트림의 창조의 시간에 예외 org.apache.spark.sql.AnalysisException: Can't extract value from value#337;을 얻고있다
val columns = Array("column1", "column2") // column names 
val rawKafkaDF = sparkSession.sqlContext.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers","localhost:9092") 
    .option("subscribe",topic) 
    .load() 
    val columnsToSelect = columns.map(x => new Column("value." + x)) 
    val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) 

    // some analytics using stream dataframe kafkaDF 

    val query = kafkaDF.writeStream.format("console").start() 
    query.awaitTermination() 

, 내부에 알 수없는 값 ...

당신이 어떤 제안이 있습니까 ?

답변

6

Spark의 관점에서 value은 바이트 시퀀스입니다. 직렬화 형식이나 내용에 대해 알지 못합니다. 파일을 추출하려면 먼저 구문을 분석해야합니다.

데이터가 JSON 문자열로 직렬화되는 경우 두 가지 옵션이 있습니다. 경로에 의해 추출 필드 get_json_object를 사용

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.from_json 

val schema: StructType = StructType(Seq(
    StructField("column1", ???), 
    StructField("column2", ???) 
)) 

rawKafkaDF.select(from_json($"value".cast(StringType), schema)) 

또는 StringTypecast : 당신은 StringTypecastvaluefrom_json을 사용하여 스키마를 제공 할 수

import org.apache.spark.sql.functions.get_json_object 

val columns: Seq[String] = ??? 

val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) 

rawKafkaDF.select(exprs: _*) 

나중에 원하는 유형 cast.