2017-04-10 2 views
0

특정 열의 데이터 세트에서 문자열을 대체하려고 시도했습니다. 1 또는 0, 1이면 1, 그렇지 않으면 0입니다.rdd 구별 대신 pyspark sql 함수

람다를 사용하여 데이터 프레임 대 rdd 변환을 사용하여 대상으로 지정할 열을 식별 할 수 있지만 처리하는 데 시간이 걸립니다.

각 열에 대해 rdd 로의 전환이 수행 된 후 구별이 수행됩니다.

고유 한 결과 세트에 'Y'가 있으면 변환이 필요한 것으로 식별됩니다.

누구나 내가 각 열을 바꾸지 않고 동일한 결과를 얻기 위해 pyspark SQL 함수를 어떻게 독점적으로 사용할 수 있는지 궁금한가요? 다음과 같이

코드는 샘플 데이터에 있습니다 :

import pyspark.sql.types as typ 
    import pyspark.sql.functions as func 

    col_names = [ 
     ('ALIVE', typ.StringType()), 
     ('AGE', typ.IntegerType()), 
     ('CAGE', typ.IntegerType()), 
     ('CNT1', typ.IntegerType()), 
     ('CNT2', typ.IntegerType()), 
     ('CNT3', typ.IntegerType()), 
     ('HE', typ.IntegerType()), 
     ('WE', typ.IntegerType()), 
     ('WG', typ.IntegerType()), 
     ('DBP', typ.StringType()), 
     ('DBG', typ.StringType()), 
     ('HT1', typ.StringType()), 
     ('HT2', typ.StringType()), 
     ('PREV', typ.StringType()) 
     ] 

    schema = typ.StructType([typ.StructField(c[0], c[1], False) for c in col_names]) 
    df = spark.createDataFrame([('Y',22,56,4,3,65,180,198,18,'N','Y','N','N','N'), 
           ('N',38,79,3,4,63,155,167,12,'N','N','N','Y','N'), 
           ('Y',39,81,6,6,60,128,152,24,'N','N','N','N','Y')] 
           ,schema=schema) 

    cols = [(col.name, col.dataType) for col in df.schema] 

    transform_cols = [] 

    for s in cols: 
     if s[1] == typ.StringType(): 
     distinct_result = df.select(s[0]).distinct().rdd.map(lambda row: row[0]).collect() 
     if 'Y' in distinct_result: 
      transform_cols.append(s[0]) 

    print(transform_cols) 

출력은 다음과 같습니다

['ALIVE', 'DBG', 'HT2', 'PREV'] 

답변

1

내가 작업을하기 위해 udf를 사용할 수 있었다. 당신은 N1에, 0Y을지도하기 위해 udf 기능을 만들 수

cols_sel = df.select([func.first(col).alias(col) for col in df.columns]).collect()[0].asDict() 
cols = [col_name for (col_name, v) in cols_sel.items() if v in ['Y', 'N']] 
# return ['HT2', 'ALIVE', 'DBP', 'HT1', 'PREV', 'DBG'] 

다음 : 첫째, Y 또는 N와 열 (여기에 내가 첫 번째 행을 통해 탈지하기 위해 func.first를 사용)를 선택하십시오.

def map_input(val): 
    map_dict = dict(zip(['Y', 'N'], [1, 0])) 
    return map_dict.get(val) 
udf_map_input = func.udf(map_input, returnType=typ.IntegerType()) 

for col in cols: 
    df = df.withColumn(col, udf_map_input(col)) 
df.show() 

마지막으로 열을 합칠 수 있습니다. 그때 나는 사전에 출력을 변환하고 0보다 큰 값을 가진 열을 확인

out = df.select([func.sum(col).alias(col) for col in cols]).collect() 
out = out[0] 
print([col_name for (col_name, val) in out.asDict().items() if val > 0]) 

출력

['DBG', 'HT2', 'ALIVE', 'PREV'] 
+1

감사 (즉 Y를 포함), 그것은 반드시 더 효율적 아니지만, 그것은 유용 내가 다른 곳으로 가려면 pyspark를 처음 사용하는 것입니다. – alortimor

+0

환영합니다! 그게 조금 도움이 되었길 바래요! – titipata