-1

3 개의 다른 PySpark DataFrames를 기반으로 계산을 수행하고 있습니다.Pyspark의 새로운 DataFrame에 반복적으로 저장

이 스크립트는 계산을 수행한다는 의미에서 작동하지만, 상기 계산의 결과와 올바르게 작동하는 데 어려움을 겪습니다. 각 열 및 데이터의 각 행에 대한

import sys 
import numpy as np 
from pyspark import SparkConf, SparkContext, SQLContext 
sc = SparkContext("local") 
sqlContext = SQLContext(sc) 

# Dummy Data 
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5']) 
df.show() 
+---+---+---+---+---+ 
| p1| p2| p3| p4| p5| 
+---+---+---+---+---+ 
| 0| 1| 0| 0| 0| 
| 1| 1| 0| 0| 1| 
| 0| 0| 1| 0| 1| 
| 1| 0| 1| 1| 0| 
| 1| 1| 0| 0| 0| 
+---+---+---+---+---+ 

# Values 
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index')) 
values.show() 
+----+----+-----+ 
| f1| f2|index| 
+----+----+-----+ 
| 0| 1| p1| 
|null| 1| p2| 
| 0| 0| p3| 
|null| 0| p4| 
| 1|null| p5| 
+----+----+-----+ 

# Weights 
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index')) 
weights.show() 
+----+----+-----+ 
| f1| f2|index| 
+----+----+-----+ 
| 4| 3| p1| 
|null| 1| p2| 
| 2| 2| p3| 
|null| 3| p4| 
| 3|null| p5| 
+----+----+-----+ 

# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V. 
# If there a no similarities between Row and V outputs 0 
def W_sum(row,v,w): 
    if len(w[row==v])>0: 
     return float(np.sum(w[row==v])/len(w)) 
    else: 
     return 0.0 

은, 상기 함수가 적용된다.

# We iterate over the columns of Values (except the last one called index) 
for val in values.columns[:-1]: 
    # we filter the data to work only with the columns that are defined for the selected Value 
    defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()] 
    # we select only the useful columns 
    df_select= df.select(defined_col) 
    # we retrieve the reference value and weights 
    V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten() 
    W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten() 
    W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType()) 
    df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns)))) 

이 제공 : 나는 그것을 요청으로

+---+---+---+---+---+---+ 
| p1| p2| p3| p4| p5| f1| 
+---+---+---+---+---+---+ 
| 0| 1| 0| 0| 0|2.0| 
| 1| 1| 0| 0| 1|1.0| 
| 0| 0| 1| 0| 1|2.0| 
| 1| 0| 1| 1| 0|0.0| 
| 1| 1| 0| 0| 0|0.0| 
+---+---+---+---+---+---+ 

그것은 썰어 DataFrame에 열을 추가했다. 문제는 오히려 최종 결과를 참조하기 위해 액세스 할 수있는 새 데이터로 데이터를 수집한다는 것입니다.
pandas에서와 같이 PySpark에서 DataFrame을 성장시킬 수 있습니까?

편집 내 목표는 명확하게하기 :
가 이상적으로이처럼 그냥 계산 열이있는 DataFrame을 얻을 것입니다 :

+---+---+ 
    | f1| f2| 
    +---+---+ 
    |2.0|1.0| 
    |1.0|2.0| 
    |2.0|0.0| 
    |0.0|0.0| 
    |0.0|2.0| 
    +---+---+ 
+1

스파크의 데이터 프레임은 변경 불가능합니다. 각 행에 대해 새 데이터 프레임을 만들고 이전 데이터 프레임과 함께 유니온을 적용해야 새 행에 '추가'가 생깁니다. –

+1

귀하의 질문은 무엇입니까 (원하는 출력의 예가 바람직합니까) ?? – desertnaut

+0

@desertnaut : 나는 편집했다, 희망이 있 나의 목표는 지금 더 명확하다. – Haboryme

답변

2

귀하의 질문에 몇 가지 문제가 있습니다 ...

첫째, for 루프는 오류를 생성합니다. 이는 마지막 줄의 df_select이 정의되지 않았기 때문입니다. 끝 부분에는 아무런 배정도 없습니다 (그것이 무엇을 생산합니까?). df_select 실제로 일부 라인 이전에 정의 된 사용자 subsubsample dataframe이라고하고 마지막 줄

new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns)))) 

다음 문제가 더 명확 점점 시작 같은임을 가정

. (f1에서의 결과는 단순히 덮어 쓰기되기 때문에, 자연)

values.columns[:-1] 
# ['f1', 'f2'] 

때문에 모든 루프의 결과는

+---+---+---+---+---+ 
| p1| p2| p3| p4| f2| 
+---+---+---+---+---+ 
| 0| 1| 0| 0|1.0| 
| 1| 1| 0| 0|2.0| 
| 0| 0| 1| 0|0.0| 
| 1| 0| 1| 1|0.0| 
| 1| 1| 0| 0|2.0| 
+---+---+---+---+---+ 

포함 열만 f2로 될 것이다.내가 말했듯이

이제, 상황이 같다 가정, 당신의 문제는 오히려 다른 dataframes에, 당신은 단지 subsubsample 잊을 수에 열을 추가 함께 두 열 f1 & f2이 방법을 실제로 당신의 df 초기, 아마도 나중에 원치 않는 사람을 삭제 :

init_cols = df.columns 
init_cols 
# ['p1', 'p2', 'p3', 'p4', 'p5'] 

new_df = df 

for val in values.columns[:-1]: 
    # we filter the data to work only with the columns that are defined for the selected Value 
    defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()] 
    # we retrieve the reference value and weights 
    V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten() 
    W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten() 
    W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType()) 
    new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here 

# drop initial columns: 
for i in init_cols: 
    new_df = new_df.drop(i) 

new_df를 발생하는 것입니다 :

+---+---+ 
| f1| f2| 
+---+---+ 
|2.0|1.0| 
|1.0|2.0| 
|2.0|0.0| 
|0.0|0.0| 
|0.0|2.0| 
+---+---+ 

UPDATE (주석 후) : 부동 소수점 사용 할 수 있도록 W_sum 기능의 부서를 강제로 :

지금
from __future__ import division 

new_df이 될 것입니다 : 정확히 f2

+---------+----+ 
|  f1| f2| 
+---------+----+ 
|  2.0| 1.5| 
|1.6666666|2.25| 
|2.3333333|0.75| 
|  0.0|0.75| 
|0.6666667|2.25| 
+---------+----+ 

그것이 있어야로 당신의 의견에 따르면.

+0

답장을 보내 주셔서 감사합니다. 마지막으로 결과는 잘립니다 (f2는 1.5, 2.25, 0.75, 0.75, 2.25가되어야합니다). – Haboryme

+0

@Haboryme는 UDF의 세부 사항을 전혀 검토하지 않았습니다. 나는 질문에 관한 부분을 추가하는 열에 만 초점을 맞추었고, 나는 당신의 원하는 출력을 틀림없이 복제했다. 그럼에도 불구하고 업데이 트를 참조하십시오 ... – desertnaut

+1

끝내! 감사. – Haboryme