3 개의 다른 PySpark DataFrames를 기반으로 계산을 수행하고 있습니다.Pyspark의 새로운 DataFrame에 반복적으로 저장
이 스크립트는 계산을 수행한다는 의미에서 작동하지만, 상기 계산의 결과와 올바르게 작동하는 데 어려움을 겪습니다. 각 열 및 데이터의 각 행에 대한
import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
| 0| 1| 0| 0| 0|
| 1| 1| 0| 0| 1|
| 0| 0| 1| 0| 1|
| 1| 0| 1| 1| 0|
| 1| 1| 0| 0| 0|
+---+---+---+---+---+
# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 0| 1| p1|
|null| 1| p2|
| 0| 0| p3|
|null| 0| p4|
| 1|null| p5|
+----+----+-----+
# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 4| 3| p1|
|null| 1| p2|
| 2| 2| p3|
|null| 3| p4|
| 3|null| p5|
+----+----+-----+
# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
if len(w[row==v])>0:
return float(np.sum(w[row==v])/len(w))
else:
return 0.0
은, 상기 함수가 적용된다.
# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we select only the useful columns
df_select= df.select(defined_col)
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))
이 제공 : 나는 그것을 요청으로
+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
| 0| 1| 0| 0| 0|2.0|
| 1| 1| 0| 0| 1|1.0|
| 0| 0| 1| 0| 1|2.0|
| 1| 0| 1| 1| 0|0.0|
| 1| 1| 0| 0| 0|0.0|
+---+---+---+---+---+---+
그것은 썰어 DataFrame에 열을 추가했다. 문제는 오히려 최종 결과를 참조하기 위해 액세스 할 수있는 새 데이터로 데이터를 수집한다는 것입니다.
pandas에서와 같이 PySpark에서 DataFrame을 성장시킬 수 있습니까?
편집 내 목표는 명확하게하기 :
가 이상적으로이처럼 그냥 계산 열이있는 DataFrame을 얻을 것입니다 :
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
스파크의 데이터 프레임은 변경 불가능합니다. 각 행에 대해 새 데이터 프레임을 만들고 이전 데이터 프레임과 함께 유니온을 적용해야 새 행에 '추가'가 생깁니다. –
귀하의 질문은 무엇입니까 (원하는 출력의 예가 바람직합니까) ?? – desertnaut
@desertnaut : 나는 편집했다, 희망이 있 나의 목표는 지금 더 명확하다. – Haboryme