Apache 빔 파이프 라인에서 고정 창을 사용하려고합니다 (DirectRunner
사용). 다음과 같이 우리의 흐름은 다음과 같습니다 하위 술집에서아파치 빔 윈도우 - 고정 윈도우 닫지 않는 것 같습니까?
- 풀 데이터를/ , 각각의 창을 결합하여 직렬화 JSON 자바 객체로
- 윈도우 이벤트 사용자 정의
CombineFn
을 사용하여 오초 - 의 고정 창 승/
- 테스트를 위하여를 들어
List<Event>
- 에
Event
S, 단순히 출력List<Event>
파이프 라인 코드 :
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
우리가보고있는 결과는 우리가 어떤 로깅을하지 않는 한 최종 단계는 실행되지 않습니다 것입니다.
또한 사용자 정의 CombineFn
클래스의 각 메소드에 System.out.println("***")
을 추가하여 실행시기를 추적하고 실행되지 않는 것으로 보입니다.
여기서 창 설정이 잘못 되었습니까? 우리는 https://beam.apache.org/documentation/programming-guide/#windowing에있는 예제를 따라 갔고 상당히 단순 해 보였지만 분명히 빠진 것이 있습니다.
모든 통찰력을 환영합니다 - 미리 감사드립니다!
질문에 대답하려면 네, 여기 창 설정이 올바르게 설정되어 있어야합니다. 'ParseEventFn()'에 간단히 인쇄하여 Pub-Sub를 통해 엘리먼트가 있는지 확인하십시오. 중요한 점은 파이프 라인을 실제로 실행하기 위해'pipeline.run()'을 사용 했습니까? 아무것도 말 안했어. –
죄송합니다. 더 많은 컨텍스트를 제공 했어야합니다. 이미'ParseEventFn'에 이벤트를 출력하고 있습니다. 파이프 라인을 실행하기 위해'pipeline.run()'도 사용하고 있습니다. 나는 방아쇠를 추가했는데 이것은 좋은 소식 인 이벤트를 방출하는 창을 일으키고 있습니다 - 창을 여는 것은 이벤트 시간 (처리 시간이 아닌)에 기반하고 윈도우는 결과를 방출 할시기를 알지 못합니다/oa 방아쇠. – cstaikos