2016-12-07 3 views
0

두 이벤트가 Flink로 스트리밍되는 경우, 다음 이벤트 (DataStream API 또는 CEP 사용)의 정보를 사용하여 논리적으로 조인 될 수 있습니까? 예 : 아래 예제의 세 번째 이벤트는 right_id 및 left_id를 기반으로 처음 두 이벤트를 연결하는 데 사용할 수 있습니까?세 번째 이벤트를 기반으로 Flink에서 두 이벤트 연결

ID: AAAA 
ID: BBBB 
ID: ZZZZ, right_id: AAAA, left_id: BBBB 

답변

0

이것은 매우 기본적인 CEP 사용 사례입니다. 코드는 다음과 같습니다.

// data stream creation 
DataStream<Event> myStream = ... 

// cep pattern definition 
Pattern<Event, ?> myPattern = Pattern.<Event>begin("first_event") 
       .followedBy("second_event") 
       .followedBy("match_event"); 

// cep pattern stream: apply pattern to stream 
PatternStream<Event> myPatternStream = CEP.pattern(myStream, myPattern); 

// create new data stream from pattern matches 
DataStream<CEPEvent> myCEPEvent = myPatternStream.flatSelect(
       (Map<String, Event> pattern, Collector<CEPEvent> out) -> { 

       // load potential event sequence matches 
       Event first_event = pattern.get("first_event"); 
       Event second_event = pattern.get("second_event"); 
       Event match_event = pattern.get("match_event"); 

       // test event sequences 
       if (match_event.right_id.equals(first_event.ID) 
        && match_event.left_id.equals(second_event.ID) 
       ){out.collect(new CEPEvent("successful cep hit"));} 
      } 
     );