2016-11-08 3 views
1

AWS EMR의 Spark (현재 m4.xlarge 마스터 인스턴스와 2 개의 m4가있는)에서 처리하려고하는 s3의 대형 (약 85GB 압축) gzipped 파일이 있습니다. 100GB EBS 볼륨이있는 10 개의 대형 코어 인스턴스). gzip은 분할 할 수없는 파일 형식이고, I'veseenitsuggested 압축 파일을 다시 분할해야한다는 점을 알고 있습니다. Spark은 처음에 하나의 파티션으로 RDD를 제공하기 때문입니다. 그러나큰 gzipped 파일 다루기

scala> val raw = spark.read.format("com.databricks.spark.csv"). 
    | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")). 
    | load("s3://path/to/file.gz"). 
    | repartition(sc.defaultParallelism * 3) 
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields 
scala> raw.count() 

을하고 스파크 응용 프로그램의 UI를 살펴 복용 후, 또는 적어도 나 '(I는 여전히 하나 개의 작업으로 하나의 활성 집행을 (다른 (14)가 죽은)를 참조하고, 작업이 완료되지 않습니다 오래 기다리지 않았다).

  • 여기 무슨 일이 일어나고 있습니까? 이 예제에서 스파크가 어떻게 작동하는지 이해할 수 있습니까?
  • 다른 클러스터 구성을 사용해야합니까?
  • 아쉽게도 압축 모드를 제어 할 수 없지만 이러한 파일을 처리 할 수있는 대체 방법이 있습니까?

답변

3

파일 형식이 분할 가능하지 않으면 하나의 코어에서 파일 전체를 읽지 않도록 할 방법이 없습니다. 작업을 병렬화하려면 작업 단위를 다른 컴퓨터에 할당하는 방법을 알아야합니다. gzip의 경우, 128M 덩어리로 나누면된다. n 번째 청크는 n-1 번째 청크의 위치 정보에 따라 n-2-n 번째 청크에 의존하는 압축 풀기 방법을 알기 때문에, 첫 번째 청크까지 내려갑니다.

병렬 처리하려면이 파일을 분할 가능하게 만들어야합니다. 한 가지 방법은 압축을 풀고 압축을 풀거나 압축을 풀고 여러 파일 (원하는 각 병렬 작업에 대해 하나의 파일)로 분할 한 다음 각 파일을 gzip으로 압축하는 것입니다.

+1

스파크가 파일을 다시 파티션하기 전에 파일의 압축을 해제한다는 인상하에있었습니다. 그렇지 않은가요? 내가 말한 네 개의 링크는 무엇입니까? – user4601931

+0

예, Spark는 병렬 처리를 증가시키기 위해 파일을 섞기 전에 먼저 파일 전체를 압축 해제합니다 (한 코어의 80G). – Tim

+0

자, 고마워. 내 클러스터가이 작업을 처리 할 수있을 것이라고 생각합니까? 그렇다면 전체 파일의 압축을 풀고 파티션을 다시 나누고 추가 처리를 수행하려면'spark.dynamicAllocation.enabled = true'를 설정하면 (가능한 한 많은 메모리를 사용하여) 하나의 실행 프로그램을 확보 할 수 있다고 생각합니까? 압축을 풀고 프로세싱을 한 후에 더 많은 집행자 (메모리가 적지 만 많은 코어)를 수행합니까? – user4601931