0
슬라이딩 창을 사용하여 EvenTime에서 이벤트를 처리하고 싶습니다. 슬라이딩 간격은 24 시간이고 증가분은 30 분입니다. 문제는 아래 코드가 각 이벤트에 대해 48 개의 계산을 생성하고 있다는 것입니다. 우리의 경우 이벤트가 순서대로오고 있으므로 최신 윈도우 만 평가하면됩니다.이벤트 시간 기반 슬라이딩 창에 대한 최신 창만 확인하십시오.
감사합니다,
데얀
public static void processEventsa(
DataStream<Tuple2<String, MyEvent>> events) throws Exception {
events.assignTimestampsAndWatermarks(new MyWatermark()).
keyBy(0).
timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)).
apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input,
Collector<Tuple2<String, MyEvent>> out) throws Exception {
for (Tuple2<String, MyEvent> record : input) {
}
}
});
}
public class MyWatermark implements
AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> {
@Override
public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return event.f1.eventTime;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return new Watermark(event.f1.eventTime);
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);