Flink DataStream 유형이 DataStream[(String, somecaseclass)]
입니다. String
인 Tuple
의 첫 번째 필드에 그룹화하여 ListBuffer[somecaseclass]
을 만들고 싶습니다.키로 레코드를 그룹화하고 Flink 스트리밍에서 ListBuffer로 수집하십시오.
val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
.keyBy(0)
.fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}
그러나 10 입력 행, 첫 번째 출력 행이 바로 첫 번째 행에 연결됩니다이있는 경우에만, 열 번째 행이 나를을주는 의미 나에게 각 행에 대한 출력을 제공합니다 아래에 내가 시도 것입니다 10 개의 행에 연결된 출력. 그러나 나는 열 번째 행만 원할 것이다. 거의 모든 변환을 Flink DataStream
에서 확인했지만 유스 케이스에는 적합하지 않습니다.
입력 :
(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))
예상 출력 :
이(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))
자세한 답변을 주셔서 감사합니다. 나는 창을 확인하고 돌아올 것이다. –
결과를 얻기 위해 텀블링 창을 사용했습니다. (outputbuffer), b) => {(b._1, outputbuffer._2 +)}} keyBy (0) .window (TumblingProcessingTimeWindows.of (Time.seconds (3)) .fold = b._2)}}. 다시 한번 감사드립니다. –