2017-10-19 16 views
1

스파크를 처음 접했습니다. 나는 구조화 된 스트리밍을 사용하여 카프카에서 데이터를 읽습니다.스파크 (2.2) : 구조화 된 스트리밍을 사용하여 카프카에서 디 리얼 리얼 기록

나는 스칼라에서이 코드를 사용하여 데이터를 읽을 수 있습니다 : 값 열에서

val data = spark.readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", brokers) 
     .option("subscribe", topics) 
     .option("startingOffsets", startingOffsets) 
     .load() 

내 데이터를 드리프트 기록이다. 스트리밍 API는 이진 형식의 데이터를 제공합니다. 데이터를 문자열 또는 json으로 캐스팅하는 예제가 있지만 데이터를 비 변환으로 변환하는 방법에 대한 예는 찾을 수 없습니다.

어떻게하면됩니까?

답변

0

나는이 블로그를 databricks 웹 사이트에서 발견했습니다. Apache Kafka의 복잡한 데이터 스트림을 소비하고 변환하기 위해 Spark SQL의 API를 활용하는 방법을 보여줍니다.

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

이 UDF는 디시리얼라이저 행에 사용할 수있는 방법을 설명하는 부분이 있습니다 : 나는 자바를 사용하기 때문에 다음 샘플 UDF를 작성했다하고

object MyDeserializerWrapper { 
    val deser = new MyDeserializer 
} 
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
    MyDeserializerWrapper.deser.deserialize(topic, bytes) 
) 

df.selectExpr("""deserialize("topic1", value) AS message""") 

, 어떻게 할 수 확인 다음과 같이

UDF1<byte[], String> mode = new UDF1<byte[], String>() { 
      @Override 
      public String call(byte[] bytes) throws Exception { 
       String s = new String(bytes); 
       return "_" + s; 
      } 
     }; 

지금 내가 예를 계산 구조화 된 스트리밍 말씀이 UDF를 사용할 수 있습니다 : 자바에서 호출 할 수

Dataset<String> words = df 
       //converted the DataFrame to a Dataset of String using .as(Encoders.STRING()) 
//    .selectExpr("CAST(value AS STRING)") 
       .select(callUDF("mode", col("value"))) 
       .as(Encoders.STRING()) 
       .flatMap(
         new FlatMapFunction<String, String>() { 
          @Override 
          public Iterator<String> call(String x) { 
           return Arrays.asList(x.split(" ")).iterator(); 
          } 
         }, Encoders.STRING()); 

나를위한 다음 단계는 절약 용 비 직렬화를위한 UDF를 작성하는 것입니다. 나는 그것이 끝나자 마자 그것을 게시 할 것이다.