2017-12-10 10 views
0

에 2.2.0 dataframe 분할 쓰기 pyspark 각 파티션 (그룹)을 S3의 자체 위치에 기록합니다.병렬화는 pyspark 작업 및 내 코드로 만든 병목 현상으로 실행 시작 S3

drive_id로 파티션 된 S3 위치에 Athena 테이블을 정의해야합니다. drive_id로 쿼리하면 데이터를 매우 효율적으로 읽을 수 있습니다.

 #df is spark dataframe 
     g=df.groupBy(df.drive_id) 
     rows=sorted(g.count().collect()) 
     #each row is a parition 
     for row in rows: 
      w=df.where((col("drive_id") == row.drive_id)) 
     w.write.mode('append').parquet("s3n://s3bucket/parquet/drives/"+str(table)+"/drive_id="+str(row.drive_id)) 

문제는 루프가 직렬 처리를하고 드라이브 파티션에 하나씩 만 쓰기 때문입니다.

분명히 이것은 단일 파티션 쓰기 작업이 매우 작고 병렬 처리가 많은 양을주지 않기 때문에 잘 확장되지 않습니다.

단일 작업으로 모든 파티션을 다른 위치에 기록하는 단일 쓰기 명령으로 루프를 바꾸려면 어떻게해야합니까?

이 작업은 드라이버가 아닌 스파크 작업자에서 실행되도록 병렬 처리해야합니다.

답변

1

나는 대답을 알아 냈습니다.

dataframe.write.parquet에는 선택적 매개 변수 partitionBy (names_of_partitioning_columns)가 있습니다.

그래서 "에 의해 그룹"의 필요없이 루프의 필요가 없습니다 :

df.write.partitionBy(drive_id).parquet("s3n://s3bucket/dir") 

표준 하이브 형식 "S3N에 파티션을 생성합니다 : 한 줄을 사용하여 // s3bucket/디렉토리/drive_id = 123 "