2017-12-28 47 views
-1

저는 UDF를 만들었고 조인 내부에서 병합 결과에 적용하려고합니다. 는 이상적으로는 가입 기간 동안이 일을하고 싶습니다 :"Task not serializable"예외로 UDF가있는 쿼리가 실패하는 이유는 무엇입니까?

def foo(value: Double): Double = { 
    value/100 
} 

val foo = udf(foo _) 

df.join(.....) 
    .withColumn("value",foo(coalesce(new Column("valueA"), new Column("valueB")))) 

하지만 예외 Task not serializable을 얻고있다. 해결 방법이 있습니까?

답변

1

람다 함수를 사용하여 직렬화 할 수 있습니다. 이 예제는 잘 동작합니다.

import org.apache.spark.sql.functions.col 
import org.apache.spark.sql.functions.coalesce 
import org.apache.spark.sql.functions.udf 
val central: DataFrame = Seq(
    (1, Some(2014)), 
    (2, null) 
).toDF("key", "year1") 

val other1: DataFrame = Seq(
    (1, 2016), 
    (2, 2015) 
).toDF("key", "year2") 
def fooUDF = udf{v: Double => v/100} 

val result = central.join(other1, Seq("key")) 
    .withColumn("value",fooUDF(coalesce(col("year1"), col("year2")))) 
0

하지만 예외 Task not serializable을 얻고있다.

악명 "작업 직렬화 없음"예외에 대한 이유는 def foo(value: Double): Double (간접적 unserializable SparkContext를 참조 아마도 SparkSession있는) unserializable 소유 개체의 일부이다.

해결 방법은 직렬화 할 수없는 값에 대한 참조가없는 "독립형"개체의 일부로 메서드를 정의하는 것입니다.

해결 방법이 있습니까?

@firas로 other answer을 참조하십시오.