2017-05-16 5 views
4

Apache 빔 파이프 라인에서 고정 창을 사용하려고합니다 (DirectRunner 사용). 다음과 같이 우리의 흐름은 다음과 같습니다 하위 술집에서아파치 빔 윈도우 - 고정 윈도우 닫지 않는 것 같습니까?

  1. 풀 데이터를/
  2. , 각각의 창을 결합하여 직렬화 JSON 자바 객체로
  3. 윈도우 이벤트 사용자 정의 CombineFn을 사용하여 오초
  4. 의 고정 창 승/
  5. 테스트를 위하여를 들어 List<Event>
  6. Event S, 단순히 출력 List<Event>
을 수득

파이프 라인 코드 :

pipeline 
       // Read from pubsub topic to create unbounded PCollection 
       .apply(PubsubIO 
        .<String>read() 
        .topic(options.getTopic()) 
        .withCoder(StringUtf8Coder.of()) 
       ) 

       // Deserialize JSON into Event object 
       .apply("ParseEvent", ParDo 
        .of(new ParseEventFn()) 
       ) 

       // Window events with a fixed window size of 5 seconds 
       .apply("Window", Window 
        .<Event>into(FixedWindows 
         .of(Duration.standardSeconds(5)) 
        ) 
       ) 

       // Group events by window 
       .apply("CombineEvents", Combine 
        .globally(new CombineEventsFn()) 
        .withoutDefaults() 
       ) 

       // Log grouped events 
       .apply("LogEvent", ParDo 
        .of(new LogEventFn()) 
       ); 

우리가보고있는 결과는 우리가 어떤 로깅을하지 않는 한 최종 단계는 실행되지 않습니다 것입니다.

또한 사용자 정의 CombineFn 클래스의 각 메소드에 System.out.println("***")을 추가하여 실행시기를 추적하고 실행되지 않는 것으로 보입니다.

여기서 창 설정이 잘못 되었습니까? 우리는 https://beam.apache.org/documentation/programming-guide/#windowing에있는 예제를 따라 갔고 상당히 단순 해 보였지만 분명히 빠진 것이 있습니다.

모든 통찰력을 환영합니다 - 미리 감사드립니다!

+0

질문에 대답하려면 네, 여기 창 설정이 올바르게 설정되어 있어야합니다. 'ParseEventFn()'에 간단히 인쇄하여 Pub-Sub를 통해 엘리먼트가 있는지 확인하십시오. 중요한 점은 파이프 라인을 실제로 실행하기 위해'pipeline.run()'을 사용 했습니까? 아무것도 말 안했어. –

+0

죄송합니다. 더 많은 컨텍스트를 제공 했어야합니다. 이미'ParseEventFn'에 이벤트를 출력하고 있습니다. 파이프 라인을 실행하기 위해'pipeline.run()'도 사용하고 있습니다. 나는 방아쇠를 추가했는데 이것은 좋은 소식 인 이벤트를 방출하는 창을 일으키고 있습니다 - 창을 여는 것은 이벤트 시간 (처리 시간이 아닌)에 기반하고 윈도우는 결과를 방출 할시기를 알지 못합니다/oa 방아쇠. – cstaikos

답변

4

주요 문제가 실제로 실종 트리거였던 것처럼 보입니다 - 창문이 열리고 결과를 내 보내야 할 때 아무 것도 표시되지 않았습니다. 우리는 처리 시간 (없는 이벤트 시간) 그래서 다음 않았다 기반으로 간단하게 윈도우에 원 :

.apply("Window", Window 
    .<Event>into(new GlobalWindows()) 
    .triggering(Repeatedly 
     .forever(AfterProcessingTime 
      .pastFirstElementInPane() 
      .plusDelayOf(Duration.standardSeconds(5)) 
     ) 
    ) 
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes() 
) 

는 기본적으로이 첫 번째 요소가 처리 된 후 이벤트를 5 초 방출하는 트리거 글로벌 창을 만듭니다. 창을 닫을 때마다 다른 요소가 수신되면 다른 창이 열립니다. 우리가 withAllowedLateness 조각을 가지고 있지 않았을 때 빔은 불평했다 - 내가 알고있는 한, 이것은 단지 늦은 데이터를 무시하도록 말한다.

여기에 약간의 이해가있을 수 있지만 위의 스 니펫이 문제를 해결했습니다.

+0

50 초 간격의 목적은 무엇입니까? 목표가 단지 N 초마다 데이터를 내보내는 것이라면 언급 한 처리 시간 트리거와 함께 GlobalWindow를 사용할 수 있어야합니다. –

+0

감사합니다. @BenChambers!그 중 하나 나처럼 보이지 않았다 - 당신의 제안을 시도하고 작동 :) 내 대답을 업데이 트됩니다 – cstaikos

+1

귀하의 창 설정은 여전히 ​​정확하고 완전히 다른 계산. 5 초 이내에 발생한 사건에 관심이 있습니까? 이것은 윈도 잉입니다. 글로벌 집계를 처리하는 동안 매 5 초마다 증분 결과를 얻으시겠습니까? 이것은 글로벌 윈도우 + 트리거링입니다. 이전 동작의 일반적인 원인은 요소의 타임 스탬프로 인해 발생하는 워터 마크가 증가하지 않는 것입니다. –