윈도우 기간으로 구조화 된 스트리밍 집계를 수행하고 싶습니다. 다음과 같은 데이터 스키마가 제공됩니다. 목표는 사용자를 기반으로 최신 발생 이벤트를 기준으로 필터링하는 것입니다. 그런 다음 각 위치에 대한 각 이벤트 유형의 수를 집계하십시오.Spark Structured Streaming - 최신 및 집계 카운트로 중복 제거하는 방법
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
샘플 출력 :
location countOne countTwo
A 1 2
B 1 0
같은 다음
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
.drop($"window")
으로 구조화 된 스트리밍 스트리밍 DataFrames에서 지원되지 않는 여러 집계 및 비 시간 기반의 윈도우를 지원하지 않습니다/Datasets. 1 스트리밍 쿼리에서 원하는 출력을 얻을 수 있는지 확실하지 않습니다.
도움을 주시면 감사하겠습니다.