2017-11-03 5 views
1

SparseVector이 있다고 가정하고 그 값을 더하고 싶습니다.Pyspark : SparseVector와의 합계 오류

v = SparseVector(15557, [3, 40, 45, 103, 14356], np.ones(5)) 
v.values.sum() 

5.0 

이것은 잘 작동합니다. 이제 나는 을 사용하여 동일한 작업을 수행하려고합니다. 의 열이있는 DataFrame이 있기 때문입니다. 여기서 이해할 수없는 오류가 발생합니다.

from pyspark.sql import functions as f 

def sum_vector(vector): 
    return vector.values.sum() 

sum_vector_udf = f.udf(lambda x: sum_vector(x)) 

sum_vector_udf(v) 

---- 

AttributeError       Traceback (most recent call last) 
<ipython-input-38-b4d44c2ef561> in <module>() 
     1 v = SparseVector(15557, [3, 40, 45, 103, 14356], np.ones(5)) 
     2 
----> 3 sum_vector_udf(v) 
     4 #v.values.sum() 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/functions.py in wrapper(*args) 
    1955   @functools.wraps(f) 
    1956   def wrapper(*args): 
-> 1957    return udf_obj(*args) 
    1958 
    1959   wrapper.func = udf_obj.func 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/functions.py in __call__(self, *cols) 
    1916   judf = self._judf 
    1917   sc = SparkContext._active_spark_context 
-> 1918   return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) 
    1919 
    1920 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _to_seq(sc, cols, converter) 
    58  """ 
    59  if converter: 
---> 60   cols = [converter(c) for c in cols] 
    61  return sc._jvm.PythonUtils.toSeq(cols) 
    62 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in <listcomp>(.0) 
    58  """ 
    59  if converter: 
---> 60   cols = [converter(c) for c in cols] 
    61  return sc._jvm.PythonUtils.toSeq(cols) 
    62 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _to_java_column(col) 
    46   jcol = col._jc 
    47  else: 
---> 48   jcol = _create_column_from_name(col) 
    49  return jcol 
    50 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _create_column_from_name(name) 
    39 def _create_column_from_name(name): 
    40  sc = SparkContext._active_spark_context 
---> 41  return sc._jvm.functions.col(name) 
    42 
    43 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args) 
    1122 
    1123  def __call__(self, *args): 
-> 1124   args_command, temp_args = self._build_args(*args) 
    1125 
    1126   command = proto.CALL_COMMAND_NAME +\ 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in _build_args(self, *args) 
    1092 
    1093   args_command = "".join(
-> 1094    [get_command_part(arg, self.pool) for arg in new_args]) 
    1095 
    1096   return args_command, temp_args 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in <listcomp>(.0) 
    1092 
    1093   args_command = "".join(
-> 1094    [get_command_part(arg, self.pool) for arg in new_args]) 
    1095 
    1096   return args_command, temp_args 

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 
    287    command_part += ";" + interface 
    288  else: 
--> 289   command_part = REFERENCE_TYPE + parameter._get_object_id() 
    290 
    291  command_part += "\n" 

AttributeError: 'SparseVector' object has no attribute '_get_object_id' 

저는 실제로 두 가지 다른 방식으로 똑같은 것을 쓰고 있습니다. 어떤 팁?

답변

0

select 또는 withColumn을 사용하여 udf의 데이터 프레임 열을 전달하십시오.

import pyspark.sql.functions as F 
df = df.withColumn('new_column_name', sum_vector_udf(F.col('column_name_of_sparse_vectors')) 

또는

df = df.select([sum_vector_udf(F.col('column_name_of_sparse_vectors').alias('new_column_name')]) 

첫 번째 방법은 새로운 항목을 추가되고 두 ​​번째 방법은 새로운 dataframe 동일한 열을 대체 할 것이다.

+0

도움을 주셔서 감사합니다. 두 가지 해결책을 모두 시도했지만 항상 내 df와 동일한 오류가 발생합니다. "ClassDict (numpy.dtype) 생성시 예상되는 제로 인수"입니다. df 밖에서 udf를 실행하면 위의 내용을 얻을 수 있습니다. –

+0

udf는 numpy dtype을 허용하지 않습니다. 스파 스 벡터를 (SparseVector (15557, [3, 40, 45, 103, 14356], np.ones (5)))로 작성하십시오. –

+0

이 구문이 정확히 무엇을 의미하는지 알지 못합니다 ... 시도했지만, 여전히 위와 같은 오류가 발생합니다. numpy dtype에 관해서는 @ user6910411 답변을 작성하고 있습니다.) –

0

udf은 NumPy 형식을 반환 형식으로 지원하지 않기 때문에 이런 현상이 발생합니다.

>>> type(v.values.sum()) 
<class 'numpy.float64'> 

당신은 표준 파이썬 형식으로 결과를 캐스팅해야한다 : 두 경우 모두

df = spark.createDataFrame([(v,)], ["v"]) 

@udf("double") 
def sum_vector(vector): 
    return vector.values.sum().tolist() 

또는

@udf("double") 
def sum_vector(vector): 
    return float(vector.values.sum()) 

당신거야 예상 결과를 얻을 :

df.select(sum_vector("v")).show() 
+-------------+ 
|sum_vector(v)| 
+-------------+ 
|   5.0| 
+-------------+