2017-11-26 17 views
2

저는이 RDD의 각 요소가 XML 레코드를 나타내는 JAXB 루트 요소 인 Spark RDD를 작성했습니다.RDD를 6 부분으로 나누는 방법은?

이 RDD를 분할하여이 세트에서 6 개의 RDD를 생성하려고합니다. 기본적으로이 작업은 계층 적 XML 구조를 6 세트의 플랫 CSV 레코드로 간단하게 변환합니다. 나는 현재 6 번 RDD 6 번을이 작업에 사용하고 있습니다.

내 입력 데이터 세트는 3500 만 개입니다. Amazon S3에 저장된 각 448MB의 186 개 파일로 분할됩니다. 내 출력 디렉토리도 S3에 있습니다. EMR 스파크를 사용하고 있습니다.

6 노드 m4.4xlarge 클러스터를 사용하면이 분할을 완료하고 출력을 쓰는 데 38 분이 걸립니다.

RDD를 6 번 걷지 않고도이를 달성 할 수있는 효율적인 방법이 있습니까?

답변

5

Spark 개발자의 관점에서 가장 쉬운 해결책은 별도의 스레드에서 RDD 당 mapsaveAsTextFile을 수행하는 것입니다.

SparkContext은 스레드로부터 안전하므로 별도의 스레드에서 작업을 제출하는 데 사용할 수 있다는 사실은 널리 알려지지 않았으므로 (따라서 악용 된) 사실입니다.

그렇게 말한다면, 당신은 다음을 수행 할 수 (Future와 간단한 스칼라 솔루션을 사용하지만 반드시 Future로 가장 당신이 그렇게 말을하지 인스턴스화 시간에 계산 시작) :

xmlRdd.cache() 

import scala.concurrent.ExecutionContext.Implicits.global 
val f1 = Future { 
    val rddofTypeA = xmlRdd.map { map xml to csv} 
    rddOfTypeA.saveAsTextFile("s3://...") 
} 

val f2 = Future { 
    val rddofTypeB = xmlRdd.map { map xml to csv} 
    rddOfTypeB.saveAsTextFile("s3://...") 
} 

... 

Future.sequence(Seq(f1,f2)).onComplete { ... } 

수를 매핑 및 저장 작업을 수행하는 시간은 줄 였지만 데이터 세트에 대한 스캔 횟수는 줄이지 ​​않았습니다. 어쨌든 데이터 세트가 캐싱되어 메모리 및/또는 디스크에 저장되어 있으므로 큰 문제는 아닙니다 (스파크 SQL의 Dataset.cache에서 기본 지속성 수준은 MEMORY_AND_DISK입니다).

0

출력 경로에 대한 요구 사항에 따라 DataFrameWriter이라는 간단한 partitionByClause을 사용하여 해결할 수 있습니다.

여러 맵 대신 xmlRdd 요소를 취하고 SeqTuples으로 반환하는 함수를 디자인하십시오. 일반적인 구조는 다음과 같이 될 것이다 :

def extractTypes(value: T): Seq[(String, String)] = { 
    val a: String = extractA(value) 
    val b: String = extractB(value) 
    ... 
    val f: String = extractF(value) 
    Seq(("A", a), ("B", b), ..., ("F", f)) 
} 

flatMap, Dataset로 변환 및 쓰기 :

xmlRdd.flatMap(extractTypes _).toDF("id", "value").write 
    .partitionBy("id") 
    .option("escapeQuotes", "false") 
    .csv(...) 
+0

겠습니까 사용자 정의 [파티션 설정 (HTTP로하지'extractTypes' 행위 : //spark.apache. org/docs/latest/api/scala/index.html # org.apache.spark.Partitioner)? –