2017-12-19 23 views
1

간단한 예제를 실행하여 EventTime을 기준으로 창을 테스트합니다. 처리 시간으로 출력을 생성 할 수 있지만 EventTime을 사용할 때 출력이 나오지 않습니다. 내가 뭘 잘못하고 있는지 이해하도록 도와주세요.Flink 스트리밍 이벤트 시간 창

5 초마다 슬라이드하는 크기가 10 초인 SlidingWindow를 만들고 창 끝에는 그 시간 동안받은 메시지 수가 표시됩니다.

input : 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695856 (generated at 16th second, received at 19th second) 
a,1513695859 (generated at 13th second, received at 19th second) 

두번째 필드는 13, 13, 16 분 19 초를 나타내는 이벤트의 타임 스탬프를 나타냅니다.

if i am using Processing Time window : 

Output : 
(a,1) 
(a,3) 
(a,2) 

그러나 이벤트 시간을 사용할 때보 다 출력이 출력되지 않습니다. 무슨 일이 일어나고 있는지 이해하도록 도와주세요.

package org.apache.flink.window.training; 

import java.io.InputStream; 
import java.util.Properties; 

import org.apache.flink.api.common.functions.FoldFunction; 
import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.api.java.functions.KeySelector; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; 
import org.apache.flink.streaming.api.watermark.Watermark; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class SocketStream { 


    private static Properties properties = new Properties(); 

    public static void main(String args[]) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    InputStream inputStream = 
     SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties"); 

    properties.load(inputStream); 

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    FlinkKafkaConsumer010<String> consumer = 
     new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties); 

    DataStream<Element> socketStockStream = 
     env.addSource(consumer).map(new MapFunction<String, Element>() { 
      @Override 
      public Element map(String value) throws Exception { 

      String split[] = value.split(","); 
      Element element = new Element(split[0], Long.parseLong(split[1])); 

      return element; 
      } 
     }).assignTimestampsAndWatermarks(new TimestampExtractor()); 

    socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() { 

     @Override 
     public Tuple2<String, Integer> map(Element value) throws Exception { 

     return new Tuple2<String, Integer>(value.getId(), 1); 
     } 
    }).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)) 
    .sum(1). 
    print(); 

    env.execute(); 
    } 

    public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> { 

    private static final long serialVersionUID = 1L; 

    @Override 
    public long extractTimestamp(Element element, long previousElementTimestamp) { 

     return element.getTimestamp(); 
    } 

    @Override 
    public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) { 
     // TODO Auto-generated method stub 
     return null; 
    } 
    } 
} 

답변

4

이벤트 타임 처리가 제대로 timestamps and watermarks 생성이 필요합니다.

코드의 TimestampExtractor은 워터 마크를 생성하지 않지만 항상 null을 반환합니다.

+0

Thnks Fabian은 워터 마크 할당을 사용하여 문제없이 작동하지만 ..... 워터 마크를 null로 할당하면 여전히 궁금해합니다. 워터 마크 (t)가 일어나는 이유는 't가없는 요소가 없어야한다는 것입니다.' 그래서 그것의 null 인 경우, 모든 요소를 ​​임의의 시각 t로 받아 들여, 출력을 생성 할 필요가 있습니다. –

+1

'null'을 반환하면 워터 마크가 업데이트되지 않습니다. 따라서 워터 마크는 항상 'Long.MIN_VALUE'에 유지되고 절대로 진행되지 않으므로 창을 계산할 수 없습니다. –