두 개의 데이터 프레임간에 외부 조인을 수행하는 스파크 작업이 있습니다. 첫 번째 데이터 프레임의 크기는 260GB이고 파일 형식은 2200 개의 파일로 분할되는 텍스트 파일이며 두 번째 데이터 프레임의 크기는 2GB입니다.S3에서 거대한 데이터를 빠르게 작성하기 위해 EMR에서 스파크 작업을 조정하는 방법
이 두 파일을 데이터 프레임 자체에로드하는 데 10 분이 걸립니다.
약 260GB의 데이터 프레임 출력을 S3에 쓰려면 약 1 시간이 걸립니다.
다음은 내 클러스터 정보입니다. 여기
emr-5.9.0
Master:1m3.2xlarge
Core:c3.4large 5 machines
이 내 코드 내가 여기
[
{
"Classification": "spark-defaults"
, "Properties": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
},{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.maxConnections": "200"
}
}
]
를 설정하고 내 클러스터 설정입니다
CPU:16
RAM:30
DISK:2 × 160 GB SSD
Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2
각 c3.4xlarge 기계의 세부 사항입니다
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN"))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name))
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer")
.select($"FundamentalSeriesId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"),
when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"),
when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"),
when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"),
when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"),
$"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))
val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq
val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|")
val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FundamentalAnalytic/output")
I 이 이외에 나는 HDFS에 데이터 쓰기를 시도했으나 HDFS 디렉토리에 쓰는 데는 같은 시간 (S3보다 4 분 이상)이 걸립니다.
DAG
추가 SQL 물리적 계획을 추가메모리 내 일의 마지막 시간을 사용
012 여기3,516,몇 가지 작업 로그
I looking into the job execution on the below clusters .Here were some on my observations :
Majority time is being consumed at RDD getting spilling in the disk .
#########
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far)
#########
This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size :
#########################
17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead)
#########################
Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled .
You will notice the same in the nodemanager Logs :
2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used
웹 UI에서 논리적 계획을 보여줍니다. 스크린 샷을 사용하여 코드가하는 일에 대한 아이디어를 사람들에게 제공하십시오. 감사. –
Spark의 웹 UI에서 쿼리에 대한 실제 계획에 대해 질문했습니다. 그러면 쿼리가 정확히 무엇을하는지 알 수 있습니다. –
@JacekLaskowski 오, 알았어. 나는 그를 갠젤리아에서 볼 수 있을까? – SUDARSHAN