2017-12-15 11 views
2

SELECT [...] UNION ALL SELECT [...] 양식의 Spark SQL 문을 사용하는 경우 두 개의 SELECT 문을 병렬로 실행합니까? 내 특정 사용 사례에서 두 SELECT 두 가지 다른 데이터베이스 테이블을 쿼리하고 있습니다. 내가 예상했던 것과는 달리 Spark UI는 두 개의 SELECT 문이 순차적으로 수행된다고 제안하는 것으로 보입니다.다른 테이블의 두 SELECT 중 UNION ALL이 병렬로 실행됩니까?

== Physical Plan == 
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0 
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4) 
    +- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655]) 
     +- Exchange hashpartitioning(neighborhood#163, 4) 
     +- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L]) 
      +- *Project [neighborhood#163, (tip_amount#513/total_amount#514) AS tip_pct#654] 
       +- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514] 
        +- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
          +- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165] 
           +- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310] 
           +- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164)) 
            :- *Sort [curve#578 ASC NULLS FIRST], false, 0 
            : +- Exchange hashpartitioning(curve#578, 4) 
            :  +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579] 
            :  +- Union 
            :   :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524] 
            :   : +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0)) 
            :   :  +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},[email protected]) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point... 
            :   +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524] 
            :    +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0)) 
            :     +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},[email protected]) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point... 
            +- *Sort [curve#580 ASC NULLS FIRST], false, 0 
             +- Exchange hashpartitioning(curve#580, 4) 
              +- Generate inline(index#165), true, false, [curve#580, relation#581] 
              +- InMemoryTableScan [neighborhood#163, polygon#164, index#165] 
                +- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
                 +- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15] 
                  +- InMemoryTableScan [metadata#13, polygon#12, index#15] 
                    +- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods` 
                     +- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p... 

BigQueryTableRelation에 스캔의 형태로 두 SELECT의 존재의 결합 :

== 업데이트 1 == 아래

는 스파크 UI에 표시되는 물리적 계획이다. 이들은 순차적으로 실행되는 것 같습니다.

각 BigQuery 선택 항목은 개별 작업 (각각 하나의 스테이지로 구성됨)에서 순차적으로 실행됩니다. 나는 각각 4 개의 CPU와 26GB의 RAM을 가진 5 노드 YARN 클러스터를 돌린다. 사용자 정의 BigQuery 데이터 소스가 있는지 여부에 대해 궁금합니다. 나는 그렇게해서는 안된다. 어떤 경우에는, 참조, 데이터 소스는 여기에서 찾을 수 있습니다 : 나는 다음과 같은 로그 항목을 참조 스파크 로그에서 github.com/miraisolutions/spark-bigquery

== 업데이트 2 ==

:

17/12/19 14:36:24 INFO SparkSqlParser: Parsing command: SELECT `pickup_latitude` AS `pickup_latitude`, `pickup_longitude` AS `pickup_longitude`, `tip_amount` AS `tip_amount`, `total_amount` AS `total_amount` FROM ((SELECT * FROM `trips2014`) UNION ALL (SELECT * FROM `trips2015`)) `ggcyamhubf` WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))

불꽃이 쿼리를 최적화하고 데이터 소스 (이 경우의 BigQuery)까지 술어 민다. 그러나 해당 BigQuery 작업은 완전히 순차적으로 실행되는 것처럼 보입니다. 즉, 첫 번째 작업이 완료되면 두 번째 작업이 시작됩니다.

+0

BigQuery가 동일한 클라이언트의 두 개의 병렬 쿼리가 차례대로 실행되는 방식 일 수 있습니까? 웹 UI에서 쿼리 계획을 첨부 할 수 있습니까? –

+0

지금은 쿼리 계획을 사용할 수 없지만 Spark 로그에서 'DataSource'의'buildScan' (RelationProvider를 통해)이 순차적으로 호출되는 것처럼 보입니다. 순차적 처리는 Spark의 결과입니다 실행 계획. 현재 BigQuery 제한 사항을 암시하는 내용은 없습니다. –

+0

너무 많은 추측 ... 너무 작은 데이터는 우리를 도울 것입니다. 가능한 순차적 실행의 이유는 쿼리의 일부인 모든 작업을 처리 할 수있는 CPU 수입니다. 그래서 우리는 웹 UI에서 실행 통계가 심하게 필요합니다. 로그를 보관하지 않는 이유는 무엇입니까?> https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact? –

답변

2

TL; DR 예 (CPU 가용성에 따라) 보조 노트로

: 당신이 의심이 있다면, 당신은 또한 다시 또한 것 (union 다음에 자신의 스레드에서 두 건의 SELECT를 실행할 수 있습니다 CPU 수에 따라 다름), 실제로는 병렬 실행이 가능합니다.

의가 (아주 기본) 다음 쿼리를 사용하자 :

val q = spark.range(1).union(spark.range(2)) 

explain CPU의 관점에서 최종 실행에 대해 얘기하지만, 적어도 전체 단계의 코드 생성 사용 여부를 제공하지 않습니다 그리고 얼마나 멀리 쿼리 트리. 이 예에서

scala> q.explain 
== Physical Plan == 
Union 
:- *Range (0, 1, step=1, splits=8) 
+- *Range (0, 2, step=1, splits=8) 

, (두 개의 별도의 데이터 세트에 대한 책임이 있습니다) 두 Range 실제 운영자는 "codegend"등 자신의 실행 파이프 라인됩니다 얻을 것이다. 실행 시간은 파티션의 모든 행 처리를 마칠 시간입니다 (Java 코드 자체의 "메커니즘"을 사용하지 않고는 System.sleep 또는 유사한 것을 사용할 수있는 것처럼 빨리 수행 할 수 있습니다).

쿼리의 RDD 계보가 쿼리 실행에 대한 추가 정보를 제공 할 수 있습니다.

scala> q.rdd.toDebugString 
res4: String = 
(16) MapPartitionsRDD[17] at rdd at <console>:26 [] 
| MapPartitionsRDD[16] at rdd at <console>:26 [] 
| UnionRDD[15] at rdd at <console>:26 [] 
| MapPartitionsRDD[11] at rdd at <console>:26 [] 
| MapPartitionsRDD[10] at rdd at <console>:26 [] 
| ParallelCollectionRDD[9] at rdd at <console>:26 [] 
| MapPartitionsRDD[14] at rdd at <console>:26 [] 
| MapPartitionsRDD[13] at rdd at <console>:26 [] 
| ParallelCollectionRDD[12] at rdd at <console>:26 [] 

나는 어떤 단계가 없기 때문에 많지 않다 사이에 - 당신은 병렬화 할 수 틀리지 않는 - 그것은 16 개 파티션이 단지 하나의 단계이고 그것이 16 (마지막 작업으로 빨리 완료 예정된 작업).

즉,이 경우 주문이 중요 함을 의미합니다.


는 또한 this JIRA issue에 대한 UNION ALL 아니라면 정확히 귀하의 경우처럼 비슷한 모양 발견했다.

+1

예, 첫 번째 문장이 정확합니다. 노조의 DF는 같은 단계에서 읽 힙니다. 따라서 각 DF가 8 개의 파티션을 갖고있는 예제에서 2 개의 DF를 읽는 16 개의 작업이있는 스테이지가 있습니다. 별도의 스레드에서 DF를 정의 할 필요가 없습니다. 스테이지 UI에서이를 확인할 수도 있고 이벤트 타임 라인을 볼 수도 있습니다. 예상대로 DF가 병렬로 실행되는 작업을 확인해야합니다. – Silvio

+0

'InMemoryTableScan'은 당신이 쿼리의 일부분을 캐시했음을 의미합니다. 그렇지 않습니까? (중요하지는 않지만 묻고 싶은 가치가있다). 'UNION'이 순차적으로'SELECT'를 실행한다는 것을 어떻게 알 수 있습니까? 'BigQueryTableRelation' 당 파티션 수를 볼 수 없습니다. 5 노드 x 4 CPU = 20 CPU <- 그렇게 인상적이지는 않습니까? –

+0

네, 맞습니다 : 쿼리의 일부가 캐싱됩니다. 언급했듯이 클러스터는 그다지 크지 않지만이 경우에는 목적에 부합해야합니다. 두 번째 업데이트를 보시고 더 자세한 정보를 얻으시기 바랍니다. –