EMR에서 스파크 작업 (버전 2.1.1)을 실행할 때 각 실행은 데이터 프레임에서 다른 양의 행을 계산합니다. 먼저 s3에서 4 개의 다른 데이터 프레임으로 데이터를 읽었습니다.이 개수는 데이터 프레임에 합류 한 후 항상 일치합니다. 조인의 결과는 서로 다릅니다. 이후 나는 또한 결과를 필터링하고 각 실행마다 다른 카운트를 갖는다. 변형은 1 ~ 5 행 차이가 작지만 이해할 수있는 부분입니다. Spark DataFrame 행 개수가 실행 사이에 일치하지 않습니다.
이
을위한 코드 인 조인val impJoinKey = Seq("iid", "globalVisitorKey", "date")
val impressionsJoined: DataFrame = impressionDsNoDuplicates
.join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer")
.join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left")
.join(chartSiteInstance, impJoinKey, "left")
.withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam"))
.withColumn("url", coalesce($"realUrl", $"url"))
하고 이것은 필터이다
val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache()
I 대신 여기서하지만 동일한 결과
와의 필터 방법을 사용하여 시도아무 생각하세요?
감사 Nir 씨
자세히 설명해 주시겠습니까 _ "각 실행은 데이터 프레임에서 다른 양의 행을 계산합니다"_? 소스가 일치합니까? 예 : "s3에서 데이터 읽기"_ 실행간에 동일한 결과가 나타 납니까? _ "4 개의 다른 데이터 프레임"_에 대한 행 수가 실행간에 일관성이 있는지 확인할 수 있습니까? 데이터 프레임을 읽고 사용하기 전에 카운트를 인쇄하십시오. –
예, S3에서 읽는 DF의 행 수는 실행간에 항상 일치합니다. 조인 이후의 행 수만 일치하지 않습니다. –
코드 수를 포함하여 모든 코드를 표시 할 수 있습니까? 또한 필터 함수에서 사용되는'impressionsJoinedFullDay'는 무엇입니까? 그것은'impressionsJoined'와 다른가요 ?? –