flatMapGroupsWithState
을 적용하는 중에 다음 오류 메시지가 나타납니다. 스레드에서flatMapGroupsWithState를 사용하여 상태 저장 집계를 수행하는 방법은 무엇입니까?
예외 "주요"org.apache.spark.sql.AnalysisException : 업데이트 모드에서 flatMapGroupsWithState는 스트리밍 DataFrame/데이터 집합에 집계가 지원되지 않습니다;
다음은 내가하려는 일입니다. 카프카에서
- 읽기 메시지
- 모든 그룹 UDAF을 실행하고, 각 그룹에 대한 집합을 계산 특정 기준에 기초하여 그
- 그룹을 분석 &. Agg는 KeyValueGroupDataSet를 반환하지 않습니다. 따라서 이전 단계 출력에
groupByKey
을 aggFunction 열을 기반으로 그룹에 적용하십시오. - 이 집계를
flatMapGroupsWithState
을 사용하여 이전 상태의 스트림으로 병합하십시오.
마지막 단계에 대한 오류 메시지가 나타납니다.
agg(....)
을 데이터 세트에 적용한 후에 flatMapGroupsWithState
을 적용 할 수 없다는 뜻입니까?
안녕하세요 Jacek, 시간을내어 주셔서 감사합니다. flatMapGroupWithState (after가 아님)을 적용하기 전에 agg (...) 함수를 실행 중입니다. 또한 flatMapGroupWithState는 업데이트 작업을 지원합니다. KeyValueGroupedDataset에서 if (outputMode! = OutputMode.Append && outputMode! = OutputMode.Update) {throw new IllegalArgumentException ("함수의 출력 모드가 append 또는 update 여야 함)}} – user9108941