0

PySpark에서 브로드 캐스트 된 객체를 사용하는 UDF를 호출하려고합니다. 여기PySpark에서 브로드 캐스팅 된 객체를 사용하여 UDF를 호출 할 때 오류가 발생했습니다.

상황 및 오류를 재현 최소한의 예입니다

import pyspark.sql.functions as sf 
from pyspark.sql.types import LongType 


class SquareClass: 
    def compute(self, n): 
     return n ** 2 


square = SquareClass() 
square_sc = sc.broadcast(square) 

def f(n): 
    return square_sc.value.compute(n) 

numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF() 
f_udf = sf.udf(f, LongType()) 

numbers.select(f_udf(numbers.id)).show(10) 

이 코드가 생성된다는 스택 추적 및 오류 메시지 :

Traceback (most recent call last) 
<ipython-input-75-6e38c014e4b2> in <module>() 
    13 f_udf = sf.udf(f, LongType()) 
    14 
---> 15 numbers.select(f_udf(numbers.id)).show(10) 

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    255   +---+-----+ 
    256   """ 
--> 257   print(self._jdf.showString(n, truncate)) 
    258 
    259  def __repr__(self): 

/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, 

<snip> 

An error occurred while calling o938.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
+0

추적을 제공하지 않았습니다. AttributeError 인 것 같습니다. – MaFF

답변

2

square_sc의 속성이있어 전화 작업자에게 존재하지 않는 모듈 SquareClass을 호출합니다.

당신이 UDF에 파이썬 패키지, 클래스, 기능을 사용하려면 당신에게 spark-submit를 실행하는 경우, 근로자는 파이썬 스크립트의 코드를 넣고 --py-files를 사용하여 배포하여이를 달성 할 수있는 액세스 권한이 할 수 있어야한다 pyspark

1

당신이 할 수있는 일은 클래스를 별도의 모듈로 유지하고 모듈을 sparkContext에 추가하는 것입니다.

class_module.py 

class SquareClass: 
    def compute(self, n): 
     return n ** 2 

pyspark-shell 

    import pyspark.sql.functions as sf 
    from pyspark.sql.types import LongType 
    from class_module import SquareClass 

    sc.addFile('class_module.py') 

    square = SquareClass() 
    square_sc = sc.broadcast(square) 
    def f(n): 
     return square_sc.value.compute(n) 

    f_udf = sf.udf(f, LongType()) 
    numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF() 
    numbers.select(f_udf(numbers.id)).show(10) 
    +-----+ 
    |f(id)| 
    +-----+ 
    | 0| 
    | 1| 
    | 4| 
    | 9| 
    | 16| 
    | 25| 
    | 36| 
    | 49| 
    | 64| 
    | 81| 
    +-----+