2016-12-15 4 views
0

나는 다음과 같은 PySpark 코드를 실행 한 :Google Dataproc에서 실행되는 Spark에서 saveAsTextFile을 사용하는 동안 로컬 디스크 또는 HDFS 대신 외부 저장소 (GCS)에 임시 파일을 저장하는 이유는 무엇입니까?

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 

작업이 성공적으로 완료. 그러나 작업 실행 중에 Spark는 다음 경로에서 많은 일시적인 얼룩을 만들었습니다 gs://bucket-name/output_blob_path/_temporary/0/. 최종적으로이 모든 임시 블로 브를 제거하면 작업 실행 시간의 절반이 소요되고이 시간 동안 CPU 사용률이 1 % (엄청난 자원 낭비)라는 것을 깨달았습니다.

임시 파일을 GCP 대신 로컬 드라이브 (또는 HDFS)에 저장할 수 있습니까? 나는 여전히 최종 결과 (정렬 된 데이터 집합)를 GCP로 유지하려고합니다.

우리는 작업자 노드가 10 개인 Dataproc Spark 클러스터 (VM 유형 16 코어, 60GM)를 사용하고있었습니다. 입력 데이터의 양은 10TB였습니다.

답변

1

_temporary 파일은 FileOutputCommitter의 아티팩트 일 가능성이 높습니다. 중요하게도 이러한 임시 BLOB는 엄격하게 "임시"데이터가 아니었지만 사실 작업 완료시 최종 대상으로 "이름이 바뀐"출력 데이터 만 완성되었습니다. 원본과 대상 모두 GCS에 있기 때문에 이름 바꾸기를 통해 이러한 파일을 "커밋"하는 것은 실제로 빠릅니다. 이런 이유로 HDFS에서 임시 파일을 배치 한 다음 GCS로 "커밋"하는 작업 흐름을 대체 할 방법이 없습니다. 왜냐하면 커밋은 HDFS에서 GCS로 전체 출력 데이터 세트를 다시 배관해야하기 때문입니다. 특히 기본 Hadoop FileOutputFormat 클래스는 그러한 관용구를 지원하지 않습니다.

GCS 자체는 실제 파일 시스템이 아니지만 "객체 저장소"이며 Dataproc의 GCS 커넥터는 HDFS를 가장 잘 모방합니다. 결과적으로 파일의 디렉토리 채우기를 삭제하면 실제로 icode의 링크를 해제하는 실제 파일 시스템이 아니라 GFS가 개별 객체를 삭제해야합니다.

실제로이 문제를 겪고 있다면 출력이 너무 많은 파일로 분할된다는 것을 의미합니다. 정리는 한 번에 ~ 1000 개의 파일로 이루어지기 때문에 가능합니다. 따라서 수천 개의 출력 파일이 눈에 띄게 느려서는 안됩니다. 파일이 너무 많으면 나중에 파일을 더 느리게 만들 수 있습니다. 설명에 대한

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 
+0

감사합니다 : 가장 쉬운 수정은 일반적으로 예를 들어 repartition()를 사용하여 가능하면 바로 출력 파일의 수를 줄이는 것입니다. BigQuery에서 GCS로 내 보낸 데이터를 정렬하므로 파일이 너무 많아서 조금 놀랐습니다. 내 가정은 BiqQuery 내보내기 기능이 이미 파티션 수 (GCS에 데이터 세트를 저장하기위한 최적의 파일 수)를 최적화한다는 것입니다. – user2548047

+0

적용되는 RDD 작업의 종류에 따라 변환 후 파티션 수는 입력 파티션 수와 같지 않을 수 있으며이 경우 FileInputFormat은 기본적으로 입력 파일을 기본적으로 더 작은 파티션으로 잘라냅니다. 입력 파일의 수 '--properties spark.hadoop.fs.gs.block.size = 536870912'로 이것을 조정하여 기본값 인 64MB 대신 512MB로 늘릴 수 있습니다. –

+0

클러스터 배포 시간에 기본적으로 조정할 수도 있습니다. 'gcloud dataproc 클러스터는 여러분의 작업이 보통 10TB 범위 내에 있다면 my-cluster --properties core : fs.gs.block.size = 536870912'를 만들 것입니다. 당신의 일자리가 10GB 일 경우 너무 높을 것입니다. 대부분의 경우 1000 개가 넘는 파티션과 50000 개 미만의 파티션을 목표로하는 것이 좋지만 작은 작업에도 64MB보다 작은 블록 크기로 가고 싶지는 않습니다. –