2017-12-28 19 views
2

Flink DataStream 유형이 DataStream[(String, somecaseclass)]입니다. StringTuple의 첫 번째 필드에 그룹화하여 ListBuffer[somecaseclass]을 만들고 싶습니다.키로 레코드를 그룹화하고 Flink 스트리밍에서 ListBuffer로 수집하십시오.

val emptylistbuffer = new ListBuffer[somecaseclass] 
inputstream 
    .keyBy(0) 
    .fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}} 

그러나 10 입력 행, 첫 번째 출력 행이 바로 첫 번째 행에 연결됩니다이있는 경우에만, 열 번째 행이 나를을주는 의미 나에게 각 행에 대한 출력을 제공합니다 아래에 내가 시도 것입니다 10 개의 행에 연결된 출력. 그러나 나는 열 번째 행만 원할 것이다. 거의 모든 변환을 Flink DataStream에서 확인했지만 유스 케이스에는 적합하지 않습니다.

입력 :

(filename1.dat,somecaseclass("abc","1",2)) 
(filename1.dat,somecaseclass("dse","2",3)) 
(filename1.dat,somecaseclass("daa","1",4)) 

예상 출력 :

(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4))) 

답변

1

데이터 스트림 API는 억제 할 수없는이 될 수있는 DataStream을 고려합니다. 즉, DataStream은 무한 수의 레코드를 제공 할 수 있습니다. 따라서 집계해야 할 레코드가 더 많을 수 있으므로 (ListBuffer에 추가됨) 모든 레코드가 수신 된 후에 집계 결과 (귀하의 경우에는 전체 ListBuffer)를 "방금"배출 할 수 없습니다. 원칙적으로 DataStream의 집계는 더 많은 레코드가 이어질 수 있으므로 최종 결과를 생성 할 수 없습니다. 이것은별로 실용적이지 않으므로 Flink의 DataStream API는 들어오는 각 레코드에 대해 새로운 결과를 생성합니다.

무제한 스트림에서 집계를 계산하는 일반적인 방법은 창입니다. Windows는 집계를 계산할 수있는 스트림의 경계 섹션을 정의하고 최종 결과를 방출합니다. Flink는 시간 또는 레코드 수를 기반으로 내장 된 윈도우를 제공합니다. 예를 들어, 1 시간의 텀블링 창에서 레코드 수집 기능을 사용하면 1 시간 이내에 도착한 모든 레코드를 수집합니다.

different window types의 Flink 설명서와 사용 방법을 확인하십시오.

+0

자세한 답변을 주셔서 감사합니다. 나는 창을 확인하고 돌아올 것이다. –

+0

결과를 얻기 위해 텀블링 창을 사용했습니다. (outputbuffer), b) => {(b._1, outputbuffer._2 +)}} keyBy (0) .window (TumblingProcessingTimeWindows.of (Time.seconds (3)) .fold = b._2)}}. 다시 한번 감사드립니다. –