0

다음 작업을 수행 할 것 Google 클라우드 데이터 흐름의 파이프 라인 구축을 위해 노력하고있는 파일에서 읽기 :Google 클라우드 데이터 흐름 : 동적 파일 이름

  • 이 Pubsub 가입에 대한 이벤트를 수신을
  • 은에서 파일 이름을 추출 이벤트 텍스트
  • (Google 클라우드 스토리지 버킷에서) 파일을 읽어

의 BigQuery에서

  • 스토어 기록 , 두 번째 단계의 출력을 액세스하고 3에서 사용하는 방법을 아주 확실하지

    Pipeline pipeline = //create pipeline 
    pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub")) 
         .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>) 
         .apply(TextIO.read().from(""))??? 
    

    나는 3 단계를 고민하고있다 : 다음은 코드입니다. 다음을 생성하는 코드를 작성하려고 시도했습니다.

    그러나 후속 단계에서 파일 내용을 읽을 수 없습니다.

    누구나 제 3 단계 및 4 단계에서 작성해야 파일을 한 줄씩 소비하여 BigQuery에 출력하거나 저장할 수 있습니다.

  • 답변

    2

    읽기를 표현하는 자연스러운 방법은 입력 파일 PCollection에서 파일 이름을 읽는 TextIO.readAll() 메서드를 사용하는 것입니다. 이 메소드는 Beam 코드베이스에 도입되었지만 현재 릴리스 된 버전은 아닙니다. Beam 2.2.0 릴리스 및 해당 Dataflow 2.2.0 릴리스에 포함됩니다.

    -1

    이 작업은 SerializableFunction을 사용하여 수행 할 수 있습니다.

    당신은 생성자 인수로이 클래스의 인스턴스를 생성하는 동안 정적 버킷 이름과 다른 매개 변수를 전달할 수 있습니다

    pipeline.apply(TextIO.read().from(new FileNameFn())); 
    
    public class FileNameFn implements SerializableFunction<inputFileNameString, outputQualifiedFileNameStringWithBucket> 
    

    명백한 할 수 있습니다.

    희망이 도움이 될 것입니다.

    +0

    참조하는 메서드가 존재하지 않습니다. TextIO.read(). from()은 String 또는 ValueProvider 에만 적용됩니다. write()의보다 동적 인 메소드와 혼동했을 수도있다. – jkff