2017-12-07 5 views
0

슬라이딩 창을 사용하여 EvenTime에서 이벤트를 처리하고 싶습니다. 슬라이딩 간격은 24 시간이고 증가분은 30 분입니다. 문제는 아래 코드가 각 이벤트에 대해 48 개의 계산을 생성하고 있다는 것입니다. 우리의 경우 이벤트가 순서대로오고 있으므로 최신 윈도우 만 평가하면됩니다.이벤트 시간 기반 슬라이딩 창에 대한 최신 창만 확인하십시오.

감사합니다,

데얀

public static void processEventsa(
     DataStream<Tuple2<String, MyEvent>> events) throws Exception { 

    events.assignTimestampsAndWatermarks(new MyWatermark()). 
      keyBy(0). 
      timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)). 
      apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() { 
       @Override 
       public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input, 
              Collector<Tuple2<String, MyEvent>> out) throws Exception { 

        for (Tuple2<String, MyEvent> record : input) { 



        } 
       } 
      }); 
} 

public class MyWatermark implements 
     AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> { 

    @Override 
    public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     return event.f1.eventTime; 
    } 

    @Override 
    public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     return new Watermark(event.f1.eventTime); 
    } 
} 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

답변

0

이 문제는 워터 마크했다. 지정자와 함께 사용되는주기 표기는 사용되어야합니다.

public class MyWatermark implements 
     AssignerWithPeriodicWatermarks<Tuple2<String, MyEvent>> { 

    private final long maxTimeLag = 5000; 

    @Override 
    public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     try { 
      return event.f1.eventTime; 
     } 
     catch(NullPointerException ex) {} 

     return System.currentTimeMillis() - maxTimeLag; 
    } 

    @Override 
    public Watermark getCurrentWatermark() { 
     return new Watermark(System.currentTimeMillis() - maxTimeLag); 
    } 
}