1

윈도우 기간으로 구조화 된 스트리밍 집계를 수행하고 싶습니다. 다음과 같은 데이터 스키마가 제공됩니다. 목표는 사용자를 기반으로 최신 발생 이벤트를 기준으로 필터링하는 것입니다. 그런 다음 각 위치에 대한 각 이벤트 유형의 수를 집계하십시오.Spark Structured Streaming - 최신 및 집계 카운트로 중복 제거하는 방법

time location user type 
1  A   1  one 
2  A   1  two 
1  B   2  one 
2  B   2  one 
1  A   3  two 
1  A   4  one 

샘플 출력 :

location countOne countTwo 
    A   1   2 
    B   1   0 

같은 다음

val aggTypes = df 
    .select($"location", $"time", $"user", $"type") 
    .groupBy($"user") 
    .agg(max($"timestamp") as 'timestamp) 
    .select("*") 
    .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds") 
    .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location") 
    .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo))) 
    .drop($"window") 

으로 구조화 된 스트리밍 스트리밍 DataFrames에서 지원되지 않는 여러 집계 및 비 시간 기반의 윈도우를 지원하지 않습니다/Datasets. 1 스트리밍 쿼리에서 원하는 출력을 얻을 수 있는지 확실하지 않습니다.

도움을 주시면 감사하겠습니다.

답변

0

Stateless Aggregations를하려는 것처럼 보입니다. https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)

flatMapGroups는 데이터 집합의 각 그룹에 함수를 적용하는 집계 API입니다. 그룹화 된 dataset.flatMapGroups에서만 사용할 수 있으며 셔플 오버 헤드가 증가하는 부분 집계는 지원하지 않습니다. 따라서이 API는 메모리에 적합한 작은 배치 집계를 수행 할 때만 사용하십시오. 또한 reduce 함수 또는 Aggregator를 사용하는 것이 좋습니다. https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html

val count = words.groupByKey(x => x) 
      .flatMapGroups 
      { 
       case (x, iterator) ⇒ Iterator((x, iterator.length)) 
       }.toDF("x", "count")   


count.writeStream.format("console").outputMode(OutputMode.Append())