2017-02-10 11 views
0

아파치 스파크의 캐시 된 Temp 테이블에 대한 파티션 프룬이 활성화되어 있습니까? 그렇다면 어떻게 구성합니까?캐시 된 테이블에 대한 스파크 SQL 파티션 프 루닝

내 데이터는 다른 설치에서 센서 판독 값의 묶음이며 한 행에는 installationName, tag, timestamp 및 value가 있습니다. 이제

val parquet = hc.read.parquet("/path_to_table/tablename") 
parquet.registerTempTable("tablename") 

나는 경우 :

rdd.toDF("installationName", "tag", "timestamp", "value") 
    .repartition($"installationName", $"tag") 
    .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output) 

내가 스파크 HiveContext를 사용하여 SQL 테이블에 다음 명령을 사용하여 해당 데이터를 읽어

나는 다음과 같은 명령을 사용하여 마루 형식의 데이터를 작성했습니다 이 테이블에서 SQL 쿼리를 실행하면 조각 정리가 예상대로 분할됩니다.

그리고 쿼리는 약 8 초 걸립니다. 그러나 메모리에 테이블을 캐시 한 다음 동일한 쿼리를 수행하면 항상 약 50 초가 걸립니다.

hc.sql("CACHE TABLE tablename") 
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

현재 Spark 1.6.1을 사용하고 있습니다.

+0

안녕하세요, 귀하의 의견을 보내 주셔서 감사합니다. 사실 저는 마루에 데이터를 쓰기 전에 다시 분할 작업을 수행합니다. 나는 또한 위의 쿼리를 repartitioning으로 테스트했으며 쿼리 시간이 20 초인 경우 더 효율적이지만 여전히 캐리지없이 마루 파일을 읽는 것보다 느립니다. 제 목적은 마루 파일에 모두 쓰는 것을 피하는 것입니다. 어쩌면 소스를 제공 할 수 있습니까? 캐싱 후에 파티션 정리가 지원되지 않는다는 것을 어떻게 알 수 있습니까? 여기에 답을 쓸 수 있다면 받아 들일 수 있습니다. –

+0

수정, 메모리에서 캐싱은 쿼리 시간을 1 초 미만으로 줄입니다. 물론 이것은 이미 수용 가능합니다. 나는 그것이 비늘다는 것을 의아해한다 : 이것은 나의 dasta의 단지 일부분이다, 나는 실제로 200 시간 이상 계속적으로 성장하고있다, 그래서 더 많은 자료, 모든 파티션을 통한 스캐닝에 더 많은 시간이 걸린다. 그래서 파티션 정리는 여기서 도움이되는 것처럼 보일 것이다. . –

답변

0

캐시가 작동하는 이유 때문입니다.

당신이 계획은 아래 참조있는 DataFrame, RDD 또는 데이터 집합을 실행에 프로세스의 어떤 종류를 호출

:

val df = sc.parallelize(1 to 10000).toDF("line") 
df.withColumn("new_line", col("line") * 10).queryExecution 

명령 당신에게 queryExecution 복귀 계획. 아래 코드의 논리 계획을 참조하십시오.

== Parsed Logical Plan == 
Project [*,('line * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- Scan ExistingRDD[_1#4] 

이 경우 코드에서 수행 할 모든 프로세스를 볼 수 있습니다. 이 같은 cache 함수를 호출 할 때 :

df.withColumn("new_line", col("line") * 10).cache().queryExecution 

이 결과는 다음과 같이 될 것입니다 :

이 실행이 당신에게 optmized 논리적 인 계획에 InMemoryRelation의 실행을 반환합니다
== Parsed Logical Plan == 
'Project [*,('line * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Optimized Logical Plan == 
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None 

== Physical Plan == 
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro... 

이 저장됩니다 귀하의 메모리에있는 데이터의 구조, 또는 귀하의 데이터가 정말 큰 경우 디스크에 엎질러 질 것입니다.

클러스터에이 시간을 저장하는 데 시간이 걸리므로 처음 실행시 약간 느려지지만 다른 곳에서 동일한 데이터를 다시 액세스해야 할 경우 DF 또는 RDD가 저장되고 스파크는 실행을 다시 요청하지 않습니다.

+0

답변 해 주셔서 감사합니다. 스파크 테이블에서 캐싱은 열정적 인 작업입니다. 즉, 쿼리를 처음 실행할 때 데이터가 이미 캐시되어 있음을 의미합니다. 데이터 캐싱은 실제로 여기에서 500 초가 걸리고 실제로 캐시 이후에는 쿼리 성능이 향상되고 이제는 모든 파티션을 스캔하는 데 50 초 밖에 걸리지 않습니다. 조회를 실행 한 횟수와 관계없이 성능은 항상 거의 같습니다. 귀하의 대답은 cahcing 후 파티션 잘라 내기에 대한 내 질문을 해결하지 않습니다. –