2017-12-22 26 views
1

Dataframe 내부에서 호출하는 UDF가 있지만 undefined udf가 발생합니다. expr 함께 사용될정의되지 않은 함수 UDF가 pyspark에 있습니까?

global ac 
ac = sc.accumulator(0) 

def incrementAC(): 
    ac.add(1) 
    return str(ac.value) 

df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"]) 

df.withColumn("lang_and_rank", expr("concat(language,'blah')")).show() 

+--------+----+-------------+ 
|language|rank|lang_and_rank| 
+--------+----+-------------+ 
| Java| 90|  Javablah| 
| Scala| 95| Scalablah| 
| Spark| 92| Sparkblah| 
+--------+----+-------------+ 

myudf = udf(incrementAC,StringType()) 
df.withColumn("lang_and_rank", expr("concat(language,myudf())")).show() 

.utils.AnalysisException: u'undefined function myudf;' 

답변

2

기능 등록되어야한다 : 변환에서 사용

또한 accumualtors
spark.udf.register("incrementAC", incrementAC) 

신뢰성 아니다.

1

희망이 도움이됩니다.

from pyspark.sql.functions import udf, expr, concat, col 
from pyspark.sql.types import StringType 

ac = sc.accumulator(0) 

def incrementAC(): 
    ac.add(1) 
    return str(ac) 

#sample data 
df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"]) 

접근법 1 :

#solution using usual udf definition 
myudf = udf(incrementAC, StringType()) 
df.withColumn("lang_and_rank", concat(col('language'), myudf())).show() 

접근법 2 :

#another solution if you want to use 'expr' (as rightly pointed out by @user9132725) 
sqlContext.udf.register("myudf", incrementAC, StringType()) 
df = df.withColumn("lang_and_rank", expr("concat(language, myudf())")) 
df.show() 

출력된다 :

+--------+----+-------------+ 
|language|rank|lang_and_rank| 
+--------+----+-------------+ 
| Java| 90|  Java1| 
| Scala| 95|  Scala1| 
| Spark| 92|  Spark2| 
+--------+----+-------------+