0

SparkSQL 2.2.0을 사용하여 Cassandra에서 데이터를로드하고 Elasticsearch에 인덱스를 지정합니다. 내가 가지고있는 데이터는 고객 (첫 번째 테이블 people)과 주문 (두 번째 테이블 orders)으로 구성됩니다.
테이블 주문에는 해당 고객을 가리키는 열 person_id이 있습니다.
내 고객은 people 테이블과 orders을 쿼리 (그리고 Elasticsearch에서 나중에 인덱스)하여 각 고객에게 구매 한 주문 수를 줄 수 있습니다.
내가 알아 낸 가장 쉬운 방법은 두 테이블을 org.apache.spark.sql.Dataset<Row>으로 읽은 다음 person_id 열의에 합류시키는 것입니다. 그럼 내가 groupBy(person_id).
그게 저에게 두 개의 열이있는 데이터 세트를 제공합니다 : person_idcount 나는 people 테이블로 다시 가입 할 의무가 있으므로 다른 사람 데이터로 계산할 수 있습니다.SparkSQL 부모/자식 데이터 집합 합류

Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer"); 

Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId"); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

people 표 1_000_000 행과 하나 orders 2_500_000있다. 각 고객은 2 ~ 3 개의 주문을받습니다.
2,250MHz Intel Core i7 프로세서 및 16GB 1600MHz DDR3 메모리가있는 MAC Book Pro를 사용하고 있습니다. 모든 Cassandra, Spark 2.2 마스터 및 (단일) 작업자가 같은 시스템에 있습니다.
이 3 개의 조인은 15-20 초가 걸립니다.
내 질문에 : 성능 향상을위한 여지가 있습니다. 창 합계 함수에는 로그에서 ShuffleMapTask가 표시되므로 이점이 있습니다.

미리 감사드립니다.

답변

0

첫 번째 단계는 필요하지 않다고 생각합니다. 당신은 이것을 할 수 있습니다 :

Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count(); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

나는 이것이 도움이되기를 바랍니다.

+0

네, 맞습니다. 내 잘못이야. 그러나 그것은 여전히 ​​"상대적으로 느리다"(ab 16s). "Window Aggregate Functions"가 도움이되는지 또는 정상적인 방법인지 궁금합니다. –

+0

제가 아는 한, 이것이 그것을 수행하는 방법입니다. 특히 '그룹별'의 경우. 사용자 정의 집계 함수 (UDAF)를 살펴볼 수 있지만 특정 경우에 대해서도 마찬가지입니다. 이 작업 속도를 늦출 수있는 다른 작업이 있습니까? – Nikhil