에 2.2.0 dataframe 분할 쓰기 pyspark 각 파티션 (그룹)을 S3의 자체 위치에 기록합니다.병렬화는 pyspark 작업 및 내 코드로 만든 병목 현상으로 실행 시작 S3
drive_id로 파티션 된 S3 위치에 Athena 테이블을 정의해야합니다. drive_id로 쿼리하면 데이터를 매우 효율적으로 읽을 수 있습니다.
#df is spark dataframe
g=df.groupBy(df.drive_id)
rows=sorted(g.count().collect())
#each row is a parition
for row in rows:
w=df.where((col("drive_id") == row.drive_id))
w.write.mode('append').parquet("s3n://s3bucket/parquet/drives/"+str(table)+"/drive_id="+str(row.drive_id))
문제는 루프가 직렬 처리를하고 드라이브 파티션에 하나씩 만 쓰기 때문입니다.
분명히 이것은 단일 파티션 쓰기 작업이 매우 작고 병렬 처리가 많은 양을주지 않기 때문에 잘 확장되지 않습니다.
단일 작업으로 모든 파티션을 다른 위치에 기록하는 단일 쓰기 명령으로 루프를 바꾸려면 어떻게해야합니까?
이 작업은 드라이버가 아닌 스파크 작업자에서 실행되도록 병렬 처리해야합니다.