1

다음은 in_date 및 out_date 직원을 나타내는 샘플 데이터 세트입니다. 모든 직원의 마지막 in_time을 확보해야합니다.정렬 - 정렬을 통한 sortWithInPartitions

4 개의 노드 독립 실행 형 클러스터에서 Spark가 실행 중입니다.

초기 데이터 집합 :

직원 ID ----- 안양 후 in_date ----- out_date

1111111  2017-04-20 2017-09-14 
1111111  2017-11-02 null 
2222222  2017-09-26 2017-09-26 
2222222  2017-11-28 null 
3333333  2016-01-07 2016-01-20 
3333333  2017-10-25 null 

데이터 집합. 종류 (골 (in_date) .desc()) :

직원 ID - in_date ----- out_date

1111111 2017-11-02 null 
1111111 2017-04-20 2017-09-14 
2222222 2017-09-26 2017-09-26 
2222222 2017-11-28 null 
3333333 2017-10-25 null 
3333333 2016-01-07 2016-01-20 

df.dropDup (직원 ID) :
출력 :

직원 ID ----- in_date ----- out_date

1111111 2017-11-02 null 
2222222 2017-09-26 2017-09-26 
3333333 2016-01-07 2016-01-20 

예상 데이터 집합 :

직원 ID ----- in_date ----- out_date

1111111 2017-11-02 null 
2222222 2017-11-28 null 
3333333 2017-10-25 null 

하지만 때 내가 sortWithInPartitions에 초기 데이터 집합을 분류하고 내가 예상 데이터 집합을 가지고 deduped. 크고 작은 것이 없습니다. 어떤 도움을 주셔서 감사합니다.

추가 정보 : 로컬 모드에서 spark를 사용하여 df.sort를 실행했을 때 예상되는 출력이 달성되었습니다.
나는 어떤 종류의 파티션도하지 않았으며, 파티션을 다시 나누었다. 초기 데이터 세트는 기본 Cassandra 데이터베이스에서 가져옵니다.

답변

2

TL : DR 명시 적으로 보장하지 않는 한, 특히 Spark SQL을 사용하여 작업 할 때 Spark에서의 작업이 특정 순서로 실행되지 않는다고 가정해서는 안됩니다.

여기에 누락 된 것은 셔플입니다.

  • 부분 (맵 "측")를 집약 :로 실행되는

    df.groupBy(idCols).agg(first(c) for c in nonIdCols) 
    

    : dropDuplicates 구현은 동일하다.

  • 셔플.
  • 최종 ("축소 측") 집계.

중급 셔플은 비 결정 성을 도입하며 최종 집계가 특정 순서로 적용된다는 보장이 없습니다.

df.sort가 로컬 모드에서 Spark로 실행될 때 예상되는 출력이 달성되었습니다.

local 모드는 상당히 단순합니다. 완전 분산 모드에서 Spark 내부 동작에 대한 결론을 도출하는 데 절대로 사용해서는 안됩니다.

내가 sortWithInPartitions를 사용하여 초기 데이터 집합을 정렬하고 중복 될 때 예상 데이터 집합을 얻었습니다.

데이터가 이전에 EmployeeID으로 분할 된 경우 이는 의미가 있습니다. 이 경우 스파크는 추가 셔플을 필요로하지 않습니다.

설명에 따르면 How to select the first row of each group?에 표시된 솔루션 중 하나를 사용해야합니다.