2016-07-06 5 views
1

FlinkCEP를 사용하여 패턴에 대해서만 '게으른'일치를 원합니다. 어떻게해야합니까? 예 : 나는 ACABCABCB 입력 스트림을 가지고 있으며, A matchesB 다음에 C와 일치하여 3 개의 일치 만 얻고 6 개의 일치는 얻지 못합니다.Flink CEP와의 게으른 일치를 할 수 있습니까

내 문제를 설명하기 위해 다음 예제를 만들었습니다.

val env = StreamExecutionEnvironment.createLocalEnvironment(1) 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 

case class MyEvent(id: Int, kind: String, value: String) 
case class MyAggregatedEvent(id: Int, concatenatedValue: String) 

val eventStream = env.fromElements(
    MyEvent(1, "A", "1"), MyEvent(1, "C", "1"), 
    MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"), 
    MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"), 
    MyEvent(1, "B", "3") 
) 

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .next("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern) 

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect { 
    (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) => 
    val partA = pattern.get("pA").get 
    val partC = pattern.get("pC").get 

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value)) 
} 
outNextStream.print() 

env.execute("Experiment") 

이 나에게 다음과 같은 출력을 제공한다 :

MyAggregatedEvent (1,1 => 1)

I는 상기 패턴을 변경하는 경우 : 다음

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .followedBy("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

다음이 인쇄됩니다.

,

MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,1 => 2)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,1 => 3)
MyAggregatedEvent (1,2- = > 3)
MyAggregatedEvent (1,3 => 3) 내 출력이 될 수 있도록 한 번만 각 이벤트에 맞는 패턴을 만들 수있는 방법

:

MyAggregatedEvent을 (1,1 => 1)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,3 => 3)

답변

1

현재 Flink의 CEP 라이브러리에서는이 기능을 지원하지 않습니다. 일치하는 의미를 아직 제어 할 수 없습니다. 나는 처음에는 MATCH_ALL과 일치하는 MATCH_FIRST 모드를 추가하는 것이 좋을 것이라고 생각합니다. MATCH_FIRST은 완전히 일치하는 시퀀스를 보았 으면 모든 중간 상태를 삭제합니다. 이것은 유스 케이스를 다루어야한다.