2017-02-17 5 views
0

데이터 흐름 스트리밍 파이프 라인으로 출력 DoFn에서 파일을 읽을 :흐름이 라인의 흐름

GCS의 일부 소스 업로드 압축 파일 -> 업로드 이벤트 : PubSub로 전송 (GS ///folder/file.gz) -> PubSub I에서 데이터 흐름 스트리밍 읽기 파일 이벤트/O -> DoFn 유엔 Gzip으로는

static class CustomDoFn extends DoFn<String, String>{ 

@Override 
public void processElement(ProcessContext c) throws Exception { 
    String gcsPath = c.element(); 
    Open ReadChannel with GCS 
    Get Stream from Channel 
    while((line = stream.ReadLine()) != null){ 
     c.output(line) // Is this good way to read and send line down the pipeline? 
    } 
} 

// 파이프 라인은-수

pipeline.apply(PubSubIO.Read()). 
      apply(ParDO.of(new CustomDoFn())). 
      apply(new CustomTX()). 
      apply(BigQueryIO.Write()); 

의심은 다음과 같습니다
1.하는 것이 올바른 방법입니다 DoFn에서 루프 출력을 생성합니까?
2. Dofn 내부에서 FileBasedSource.FileBasedReader을 어떻게 사용할 수 있습니까?

+0

안녕하세요, 귀하의 질문을 이해하고 있는지 확인하고 싶습니다. 파일에서 읽는 스트리밍 파이프 라인을 만들고 싶습니까? 그리고 Pub/Sub를 사용하여 파일 이름을 받고이를 읽고 싶습니까? 파일 크기가 매우 큽니까? 파일을 읽고 각 행을 출력하는 방식의 한 가지 문제는 파일 전체가 실제로 출력되기 전에 메모리에 읽혀 져야한다는 것입니다. 대용량 파일의 경우 이는 작동하지 않으며 OOM 일 수 있습니다. –

답변

0

현재 FileBasedSource를 동적 파일 이름 (즉, 파이프 라인 구성시 지정되지 않은 파일 이름)과 함께 사용할 방법이 없습니다. Apache Beam 2.0 (https://issues.apache.org/jira/browse/BEAM-65)의 향후 개선으로이 기능을 사용할 수 있지만 아직 사용할 준비가되지 않았습니다. Alex 아마토 (Alex Amato)가 지적했듯이, 당신의 윤곽이 잡힌 접근법은 대용량 파일에 대해 메모리 제약을 받지만 그렇지 않으면 기능하는 파이프 라인을 만들어야합니다.

+0

더 구체적인 JIRA가이 사용 사례를 위해 제출되었습니다. https://issues.apache.org/jira/browse/BEAM-2511 TextIO는 다음과 같은 PCollection 읽기를 지원해야합니다. 파일 이름 – jkff