나는이 블로그를 databricks 웹 사이트에서 발견했습니다. Apache Kafka의 복잡한 데이터 스트림을 소비하고 변환하기 위해 Spark SQL의 API를 활용하는 방법을 보여줍니다.
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
이 UDF는 디시리얼라이저 행에 사용할 수있는 방법을 설명하는 부분이 있습니다 : 나는 자바를 사용하기 때문에 다음 샘플 UDF를 작성했다하고
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
, 어떻게 할 수 확인 다음과 같이
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
지금 내가 예를 계산 구조화 된 스트리밍 말씀이 UDF를 사용할 수 있습니다 : 자바에서 호출 할 수
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select(callUDF("mode", col("value")))
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
나를위한 다음 단계는 절약 용 비 직렬화를위한 UDF를 작성하는 것입니다. 나는 그것이 끝나자 마자 그것을 게시 할 것이다.