2016-06-15 6 views
1

나는 kafka -> flink -> elastic search를 사용하여 java에서 poc 프로젝트를 진행하고 있습니다.상태 변경 이벤트를 기반으로 분산 된 방식으로 얼마나 많은 "클라이언트"가 플립크 상태에 있는지를 계산하는 방법은 무엇입니까? 상태 유지 객체가 필요합니다.

On 카프카에서는 특정 주제와 같이 0에서 최대 수천 회의 이벤트까지 예측할 수없는 이벤트가 생성됩니다. 나는 10 개 주를

{"stateA":54, "stateB":100, ... "stateJ":34} 

: [Created, ... , Deleted]을 15 분 평균 수명주기와

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

FLINK이 이벤트를 소비하고 제 2 탄성에 모든 싱크 각 상태의 예에서 이벤트의 수를 검색한다 . 상태는 초당 두 번 바뀔 수 있습니다. 이론적으로 새로운 상태가 추가 될 수 있습니다. FLINK의 시간 창 https://flink.apache.org/news/2015/12/04/Introducing-windows.html

문제를 사용하는 모든 두 번째 내가 생각하고 스트림을 싱크하기 위해

내가 수를 감소/증가 할 수에 대한 guid->previous-statestateX->count 위해 정보와 상태 객체를 필요로한다는 것입니다 새 이벤트가 발생할 때

은 내가 FLINK 스트림 처리에 새로운 해요 https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

, 나는 아직 FLINK 상태 스트림 처리 파고하지 않은 상태 증기 처리에 대한 초안 문서를 발견했다. 첫 번째 단계에서는 정적 객체를 사용하려고 생각하지만 여러 flink 인스턴스가 시작될 때이 방법은 작동하지 않습니다.

내가 물어보고 싶은 :

  1. 당신이이 방법에 대해 어떻게 생각하십니까?
  2. 이러한 종류의 스트림 처리에는 flink가 적합합니까?
  3. 이 문제를 해결하기위한 귀하의 접근 방법은 무엇입니까?

또한 windowed stateful 스트림 솔루션 (또는 다른 솔루션)에 대한 일부 코드 단편을 고맙게 생각합니다.

감사합니다.

답변

1

어떨까요?

15 분 동안의 창을 사용하고 창 상태가 정리됩니다. 또한 매 초마다 창을 평가하는 사용자 정의 트리거를 사용합니다. 지금까지 윈도우 작업이 진행되는 동안 각 guid에 대한 최신 상태와 (상태, 1) 튜플을 방출하는 WindowFunction을 유지하는 ReduceFunction이 있습니다. 우리는이 상태로 열쇠를 둡니다. 나는 이것이 당신에게 당신이 찾고있는 결과를 줄 것이라고 생각합니다. 다음

val env = StreamExecutionEnvironment.getExecutionEnvironment() 
val stream = env.addSource(new FlinkKafkaProducer(...)) 

val results = stream 
    .keyBy(_.guid) 
    .timeWindow(Time.minutes(15)) 
    .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000)) 
    .apply(
    (e1, e2) => e2, 
    (k, w, i, c: Collector[(String, Long)]) => { 
     if (i.head != null) c.collect((i.head.state, 1)) 
    } 
) 
    .keyBy(0) 
    .timeWindow(Time.seconds(1)) 
    .sum(1) 
    .addSink(new ElasticsearchSink<>(...)) 

env.execute("Count States") 

ProcessingTimeTriggerWithPeriodicFirings 정의된다 :

object ProcessingTimeTriggerWithPeriodicFirings { 
    def apply(intervalMs: Long) = { 
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs) 
    } 
} 

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long) 
    extends Trigger[Event, TimeWindow] { 

    private val startTimeDesc = 
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L) 

    override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    val startTime = ctx.getPartitionedState(startTimeDesc) 
    if (startTime.value == 0) { 
     startTime.update(window.getStart) 
     ctx.registerProcessingTimeTimer(window.getEnd) 
     ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs) 
    } 
    TriggerResult.CONTINUE 
    } 

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    if (time == window.getEnd) { 
     TriggerResult.PURGE 
    } 
    else { 
     ctx.registerProcessingTimeTimer(time + intervalMs) 
     TriggerResult.FIRE 
    } 
    } 

    override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    TriggerResult.CONTINUE 
    } 
}