6

두 개의 데이터 프레임간에 외부 조인을 수행하는 스파크 작업이 있습니다. 첫 번째 데이터 프레임의 크기는 260GB이고 파일 형식은 2200 개의 파일로 분할되는 텍스트 파일이며 두 번째 데이터 프레임의 크기는 2GB입니다.S3에서 거대한 데이터를 빠르게 작성하기 위해 EMR에서 스파크 작업을 조정하는 방법

이 두 파일을 데이터 프레임 자체에로드하는 데 10 분이 걸립니다.

약 260GB의 데이터 프레임 출력을 S3에 쓰려면 약 1 시간이 걸립니다.

다음은 내 클러스터 정보입니다. 여기

emr-5.9.0 
Master:1m3.2xlarge 
Core:c3.4large 5 machines 

이 내 코드 내가 여기

[ 
    { 
     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
     } 
    },{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 
    } 
    } 
] 

를 설정하고 내 클러스터 설정입니다

CPU:16 
RAM:30 
DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

각 c3.4xlarge 기계의 세부 사항입니다

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 



import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 


val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 


val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     .select($"FundamentalSeriesId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     $"AuditID_1").otherwise($"AuditID").as("AuditID"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 



    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
    dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear") 
     .format("csv") 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 
     .save("s3://trfsdisu/SPARK/FundamentalAnalytic/output") 

I 이 이외에 나는 HDFS에 데이터 쓰기를 시도했으나 HDFS 디렉토리에 쓰는 데는 같은 시간 (S3보다 4 분 이상)이 걸립니다.

enter image description here

모든 단계 DAG enter image description here

DAG

추가 SQL 물리적 계획을 추가

enter image description here

메모리 내 일의 마지막 시간을 사용

012 여기

enter image description here

3,516,몇 가지 작업 로그

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 

######### 

17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 

######### 

This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

######################### 
    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 
######################### 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 
+0

웹 UI에서 논리적 계획을 보여줍니다. 스크린 샷을 사용하여 코드가하는 일에 대한 아이디어를 사람들에게 제공하십시오. 감사. –

+0

Spark의 웹 UI에서 쿼리에 대한 실제 계획에 대해 질문했습니다. 그러면 쿼리가 정확히 무엇을하는지 알 수 있습니다. –

+0

@JacekLaskowski 오, 알았어. 나는 그를 갠젤리아에서 볼 수 있을까? – SUDARSHAN

답변

1

당신은 RAM 각각 30GB의이있는 다섯 c3.4large EC2 인스턴스를 실행하는 것입니다. 따라서 합계가 150GB 밖에되지 않으므로 200GB가 넘는 데이터 프레임보다 훨씬 작습니다. 따라서 디스크가 많이 유출되었습니다. 어쩌면 r 유형 EC2 인스턴스 (계산에 최적화 된 c 유형과 반대되는 최적화 된 메모리)를 실행하고 성능이 향상되는지 확인할 수 있습니다.

+0

내가 r4.4xlarge를 사용하더라도 거의 같은 시간이 걸립니다 ...V 코어는 여전히 1 – SUDARSHAN

+1

@SUDARSHAN이 속성을 클러스터에 추가합니다. config {{ "분류": "스파크", "속성": { "maximizeResourceAllocation": "true"}}]' – Will