SELECT [...] UNION ALL SELECT [...]
양식의 Spark SQL 문을 사용하는 경우 두 개의 SELECT
문을 병렬로 실행합니까? 내 특정 사용 사례에서 두 SELECT
두 가지 다른 데이터베이스 테이블을 쿼리하고 있습니다. 내가 예상했던 것과는 달리 Spark UI는 두 개의 SELECT
문이 순차적으로 수행된다고 제안하는 것으로 보입니다.다른 테이블의 두 SELECT 중 UNION ALL이 병렬로 실행됩니까?
== Physical Plan ==
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655])
+- Exchange hashpartitioning(neighborhood#163, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L])
+- *Project [neighborhood#163, (tip_amount#513/total_amount#514) AS tip_pct#654]
+- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514]
+- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165]
+- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310]
+- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164))
:- *Sort [curve#578 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(curve#578, 4)
: +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579]
: +- Union
: :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524]
: : +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0))
: : +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},[email protected]) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
: +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524]
: +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0))
: +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},[email protected]) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
+- *Sort [curve#580 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(curve#580, 4)
+- Generate inline(index#165), true, false, [curve#580, relation#581]
+- InMemoryTableScan [neighborhood#163, polygon#164, index#165]
+- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15]
+- InMemoryTableScan [metadata#13, polygon#12, index#15]
+- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods`
+- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p...
주 BigQueryTableRelation
에 스캔의 형태로 두 SELECT
의 존재의 결합 :
== 업데이트 1 == 아래
는 스파크 UI에 표시되는 물리적 계획이다. 이들은 순차적으로 실행되는 것 같습니다.
각 BigQuery 선택 항목은 개별 작업 (각각 하나의 스테이지로 구성됨)에서 순차적으로 실행됩니다. 나는 각각 4 개의 CPU와 26GB의 RAM을 가진 5 노드 YARN 클러스터를 돌린다. 사용자 정의 BigQuery 데이터 소스가 있는지 여부에 대해 궁금합니다. 나는 그렇게해서는 안된다. 어떤 경우에는, 참조, 데이터 소스는 여기에서 찾을 수 있습니다 : 나는 다음과 같은 로그 항목을 참조 스파크 로그에서 github.com/miraisolutions/spark-bigquery
== 업데이트 2 ==
:
17/12/19 14:36:24 INFO SparkSqlParser: Parsing command: SELECT `pickup_latitude` AS `pickup_latitude`, `pickup_longitude` AS `pickup_longitude`, `tip_amount` AS `tip_amount`, `total_amount` AS `total_amount` FROM ((SELECT * FROM `trips2014`) UNION ALL (SELECT * FROM `trips2015`)) `ggcyamhubf` WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))
불꽃이 쿼리를 최적화하고 데이터 소스 (이 경우의 BigQuery)까지 술어 민다. 그러나 해당 BigQuery 작업은 완전히 순차적으로 실행되는 것처럼 보입니다. 즉, 첫 번째 작업이 완료되면 두 번째 작업이 시작됩니다.
BigQuery가 동일한 클라이언트의 두 개의 병렬 쿼리가 차례대로 실행되는 방식 일 수 있습니까? 웹 UI에서 쿼리 계획을 첨부 할 수 있습니까? –
지금은 쿼리 계획을 사용할 수 없지만 Spark 로그에서 'DataSource'의'buildScan' (RelationProvider를 통해)이 순차적으로 호출되는 것처럼 보입니다. 순차적 처리는 Spark의 결과입니다 실행 계획. 현재 BigQuery 제한 사항을 암시하는 내용은 없습니다. –
너무 많은 추측 ... 너무 작은 데이터는 우리를 도울 것입니다. 가능한 순차적 실행의 이유는 쿼리의 일부인 모든 작업을 처리 할 수있는 CPU 수입니다. 그래서 우리는 웹 UI에서 실행 통계가 심하게 필요합니다. 로그를 보관하지 않는 이유는 무엇입니까?> https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact? –