두 개의 데이터 프레임간에 외부 조인을 수행하는 스파크 작업이 있습니다. 첫 번째 데이터 프레임의 크기는 260GB이고 파일 형식은 2200 개의 파일로 분할되는 텍스트 파일이며 두 번째 데이터 프레임의 크기는 2GB입니다.S3에서 거대한 데이터를 빠르게 작성하기 위해 EMR에서 스파크 작업을 조정하는 방법

이 두 파일을 데이터 프레임 자체에로드하는 데 10 분이 걸립니다.

약 260GB의 데이터 프레임 출력을 S3에 쓰려면 약 1 시간이 걸립니다.

다음은 내 클러스터 정보입니다. 여기

Core:c3.4large 5 machines 

이 내 코드 내가 여기

     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 

를 설정하고 내 클러스터 설정입니다

DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

각 c3.4xlarge 기계의 세부 사항입니다

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 

import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 

val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 

    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 

I 이 이외에 나는 HDFS에 데이터 쓰기를 시도했으나 HDFS 디렉토리에 쓰는 데는 같은 시간 (S3보다 4 분 이상)이 걸립니다.

추가 SQL 물리적 계획을 추가

메모리 내 일의 마지막 시간을 사용

몇 가지 작업 로그

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 


17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 


This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 

웹 UI에서 논리적 계획을 보여줍니다. 스크린 샷을 사용하여 코드가하는 일에 대한 아이디어를 사람들에게 제공하십시오. 감사. –


Spark의 웹 UI에서 쿼리에 대한 실제 계획에 대해 질문했습니다. 그러면 쿼리가 정확히 무엇을하는지 알 수 있습니다. –


@JacekLaskowski 오, 알았어. 나는 그를 갠젤리아에서 볼 수 있을까? – SUDARSHAN



당신은 RAM 각각 30GB의이있는 다섯 c3.4large EC2 인스턴스를 실행하는 것입니다. 따라서 합계가 150GB 밖에되지 않으므로 200GB가 넘는 데이터 프레임보다 훨씬 작습니다. 따라서 디스크가 많이 유출되었습니다. 어쩌면 r 유형 EC2 인스턴스 (계산에 최적화 된 c 유형과 반대되는 최적화 된 메모리)를 실행하고 성능이 향상되는지 확인할 수 있습니다.


내가 r4.4xlarge를 사용하더라도 거의 같은 시간이 걸립니다 ...V 코어는 여전히 1 – SUDARSHAN


@SUDARSHAN이 속성을 클러스터에 추가합니다. config {{ "분류": "스파크", "속성": { "maximizeResourceAllocation": "true"}}]' – Will