2016-09-19 5 views
0

Flink CEP 라이브러리를 사용하여 Hello 및 world가 발견되면 문자열을 인쇄하려고합니다. 내 소스는 카프카 (Kafka)이며 콘솔 생산자를 사용하여 데이터를 입력합니다. 그 부분이 효과가 있습니다. 내가 주제에 입력 한 것을 인쇄 할 수 있습니다. 그러나, 그것은 나의 최종 메시지를 인쇄하지 않을 것이다 "세계는 매우 멋지다!". 그것이 람다에 들어갔다는 것을 인쇄하지 않을 것입니다. 아래는 클래스입니다Flink CEP No Results 인쇄 됨

package kafka; 

import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 
import org.apache.flink.util.Collector; 

import java.util.Map; 
import java.util.Properties; 

/** 
* Created by crackerman on 9/16/16. 
*/ 
public class WordCount { 

public static void main(String[] args) throws Exception { 

    Properties properties = new Properties(); 
    properties.put("bootstrap.servers", "localhost:9092"); 
    properties.put("zookeeper.connect", "localhost:2181"); 
    properties.put("group.id", "test"); 
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
    see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString", 
                     new SimpleStringSchema(), 
                     properties)); 

    src.print(); 


    Pattern<String, String> pattern = Pattern.<String>begin("first") 
      .where(evt -> evt.contains("Hello")) 
      .followedBy("second") 
      .where(evt -> evt.contains("World")); 

    PatternStream<String> patternStream = CEP.pattern(src, pattern); 

    DataStream<String> alerts = patternStream.flatSelect(
      (Map<String, String> in, Collector<String> out) -> { 
       System.out.println("Made it to the lambda"); 
       String first = in.get("first"); 
       String second = in.get("second"); 
       System.out.println("First: " + first); 
       System.out.println("Second: " + second); 

       if (first.equals("Hello") && second.equals("World")) { 

        out.collect("The world is so nice!"); 
       } 


      }); 

    alerts.print(); 

    see.execute(); 
} 

} 

어떤 도움을 주시면 대단히 감사하겠습니다.

감사합니다!

답변

0

문제는 내가 그것을에 예상되는 방식으로 작동, 다음 줄이 제거되면

see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

입니다.

+1

타임 스탬프와 워터 마크를 할당 했습니까? 이벤트 시간 모드는 각 레코드의 시간과 경과 시간을 Flink에 알려주는 경우에만 작동합니다. 이 정보가 없으면 Flink는 이벤트 시간에 데이터를 처리 할 수 ​​없습니다. 데이터가 도착하지 못하거나 시간 기반 작업 (예 : 창)이있는 경우 일관된 결과를 얻으려면 이벤트 시간이 필요합니다. –

+1

타임 스탬프와 워터 마크를 지정하지 않았습니다. 나는 돌아가서 이것을 구현했다. 정보 주셔서 감사합니다! – Crackerman