2017-12-17 20 views
1

flatMapGroupsWithState을 적용하는 중에 다음 오류 메시지가 나타납니다. 스레드에서flatMapGroupsWithState를 사용하여 상태 저장 집계를 수행하는 방법은 무엇입니까?

예외 "주요"org.apache.spark.sql.AnalysisException : 업데이트 모드에서 flatMapGroupsWithState는 스트리밍 DataFrame/데이터 집합에 집계가 지원되지 않습니다;

다음은 내가하려는 일입니다. 카프카에서

  • 읽기 메시지
  • 모든 그룹 UDAF을 실행하고, 각 그룹에 대한 집합을 계산 특정 기준에 기초하여 그
  • 그룹을 분석 &. Agg는 KeyValueGroupDataSet를 반환하지 않습니다. 따라서 이전 단계 출력에 groupByKey을 aggFunction 열을 기반으로 그룹에 적용하십시오.
  • 이 집계를 flatMapGroupsWithState을 사용하여 이전 상태의 스트림으로 병합하십시오.

마지막 단계에 대한 오류 메시지가 나타납니다.

agg(....)을 데이터 세트에 적용한 후에 flatMapGroupsWithState을 적용 할 수 없다는 뜻입니까?

답변

0

agg(....)을 데이터 세트에 적용한 후에 flatMapGroupsWithState을 적용 할 수 없다는 뜻입니까?

아니요. 그것은/데이터 집합

기본 출력 모드를 사용하는 것이 의미 업데이트 모드에서

flatMapGroupsWithState는 스트리밍 DataFrame에 집계 지원되지 않습니다 (강조 광산)라고 말한다update이지만, Spark 공식 설명서 Output Modes ("query with flatMapGroupsWithState"쿼리 유형 참조)에 설명 된대로 complete 또는 append이어야합니다.

enter image description here

+0

안녕하세요 Jacek, 시간을내어 주셔서 감사합니다. flatMapGroupWithState (after가 아님)을 적용하기 전에 agg (...) 함수를 실행 중입니다. 또한 flatMapGroupWithState는 업데이트 작업을 지원합니다. KeyValueGroupedDataset에서 if (outputMode! = OutputMode.Append && outputMode! = OutputMode.Update) {throw new IllegalArgumentException ("함수의 출력 모드가 append 또는 update 여야 함)}} – user9108941