2017-10-22 5 views
1

EMR에서 스파크 작업 (버전 2.1.1)을 실행할 때 각 실행은 데이터 프레임에서 다른 양의 행을 계산합니다. 먼저 s3에서 4 개의 다른 데이터 프레임으로 데이터를 읽었습니다.이 개수는 데이터 프레임에 합류 한 후 항상 일치합니다. 조인의 결과는 서로 다릅니다. 이후 나는 또한 결과를 필터링하고 각 실행마다 다른 카운트를 갖는다. 변형은 1 ~ 5 행 차이가 작지만 이해할 수있는 부분입니다. Spark DataFrame 행 개수가 실행 사이에 일치하지 않습니다.

을위한 코드 인 조인

val impJoinKey = Seq("iid", "globalVisitorKey", "date") 

val impressionsJoined: DataFrame = impressionDsNoDuplicates 
    .join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer") 
    .join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left") 
    .join(chartSiteInstance, impJoinKey, "left") 
    .withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam")) 
    .withColumn("url", coalesce($"realUrl", $"url")) 

하고 이것은 필터이다

val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache() 

I 대신 여기서하지만 동일한 결과

와의 필터 방법을 사용하여 시도

아무 생각하세요?

감사 Nir 씨

+2

자세히 설명해 주시겠습니까 _ "각 실행은 데이터 프레임에서 다른 양의 행을 계산합니다"_? 소스가 일치합니까? 예 : "s3에서 데이터 읽기"_ 실행간에 동일한 결과가 나타 납니까? _ "4 개의 다른 데이터 프레임"_에 대한 행 수가 실행간에 일관성이 있는지 확인할 수 있습니까? 데이터 프레임을 읽고 사용하기 전에 카운트를 인쇄하십시오. –

+0

예, S3에서 읽는 DF의 행 수는 실행간에 항상 일치합니다. 조인 이후의 행 수만 일치하지 않습니다. –

+0

코드 수를 포함하여 모든 코드를 표시 할 수 있습니까? 또한 필터 함수에서 사용되는'impressionsJoinedFullDay'는 무엇입니까? 그것은'impressionsJoined'와 다른가요 ?? –

답변

0

는 데이터 소스 중 하나는 시간이 지남에 따라 변화 가능성이있다?
impressionsJoined은 캐시되지 않으므로 spark는 모든 작업에 대해 처음부터 다시 평가하고 소스에서 데이터를 다시 읽는 것을 포함합니다.

조인 후에 impressionsJoined 캐싱을 시도하십시오.

+0

s3에서 읽은 DF가 변경되지 않는 DF의 수를 인쇄하는 것을 볼 수 있으므로 S3의 데이터는 변경되지 않습니다. 또한 DF가 이미 필터 앞에 캐시되어 있습니다. –

+1

기술적으로 DF의 동일한 카운트는 실행 중 행 수가 동일하다는 것을 알려주며 데이터가 동일하다는 것을 보장하지 않습니다. 데이터를 완전히 제어 할 수없는 경우 데이터 사본을 가져 와서 s3의 다른 위치에 저장하십시오. –