2016-08-29 2 views
2

나는 Scala를 사용하여 Apache Flink를 시작했습니다. 누군가 내가 가지고있는 현재의 데이터 스트림에서 느린 스트림 (k 이벤트 또는 k 단위의 시간 지연)을 만드는 방법을 알려주시겠습니까?Apache Flink : 지연된 Datastream 만들기

기본적으로 데이터 스트림에서 자동 회귀 모델 (시간 지연 버전의 선형 선형 회귀 모델)을 구현하고 싶습니다. 따라서 다음의 의사 코드와 비슷한 방법이 필요합니다. 모든 이벤트를 1 개 초 간격으로 이격 된 경우

val ds : DataStream = ... 

val laggedDS : DataStream = ds.map(lag _) 

def lag(ds : DataStream, k : Time) : DataStream = { 

} 

은이 같은 샘플의 입력 및 출력을 기대하고 2초 래그가있다.

입력 : 1, 2, 3, 4, 5, 6, 7 ...
출력 : NA, NA, 1, 2, 3, 4, 5 ...

+0

질문을 연장하고 느린 스트림의 의미를 설명 할 수 있습니까? 감사합니다. –

+1

@FabianHueske, 지연된 데이터 스트림으로 생각합니다. 그는 데이터 저장소에있는 요소를 평소보다 늦게 가져 오는 것을 의미합니다. 예를 들어 1 분 지연은 스트림에 도착한 시간보다 1 분 늦은 시간에 요소를 방출합니다. –

+1

질문은 "k 이벤트에 비해 뒤지지"며 "x 분만큼 뒤떨어져 있지 않다"고 말합니다. 하나의 해석은 k 이벤트의 FIFO 큐에 새로운 이벤트를 추가하고 새로운 이벤트가 도착할 때 큐 헤드 요소를 전달합니다. 원하는 의미론을 명확하게 정의하지 않으면 질문에 답할 수 없습니다. –

답변

4

을 감안할 때 나는 당신의 요구 사항을 올바르게 구현하려면 FIFO 큐가있는 FlatMapFunction으로 구현해야합니다. 큐는 k 이벤트를 버퍼링하고 새로운 이벤트가 도착할 때마다 헤드를 방출합니다. 내결함성 스트리밍 응용 프로그램이 필요한 경우 대기열을 상태로 등록해야합니다. 그런 다음 Flink는 상태 (즉, 대기열)의 검사 점을 처리하고 오류가 발생하면이를 복원합니다.

FlatMapFunction는 다음과 같이 수 : 시간 지연과

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{ 

    var fifo: mutable.Queue[X] = new mutable.Queue[X]() 

    override def flatMap(value: X, out: Collector[X]): Unit = { 
    // add new element to queue 
    fifo.enqueue(value) 
    if (fifo.size == k + 1) { 
     // remove head element and emit 
     out.collect(fifo.dequeue()) 
    } 
    } 

    // restore state 
    override def restoreState(state: mutable.Queue[X]) = { fifo = state } 

    // get state to checkpoint 
    override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo 

} 

돌아 요소가 더 복잡하다. 새 요소가 도착할 때만 함수가 호출되기 때문에 방출을위한 타이머 스레드가 필요합니다.

+0

정말 고마워요! 이것은 위대합니다. :) – Kauchy