2017-10-30 15 views
1

파이프 라인을 설정했습니다. * .gz 파일을 수백 가지 파싱해야합니다. 따라서 glob은 아주 잘 작동합니다.Apache Beam TextIO glob 원래 파일 가져 오기

그러나 결과 파일을 원본 파일로 지정하고 싶기 때문에 현재 처리중인 파일의 원래 이름이 필요합니다.

아무도 나를 도와 줄 수 있습니까?

여기 내 코드입니다.

@Default.String(LOGS_PATH + "*.gz") 
String getInputFile(); 
void setInputFile(String value); 


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile()); 
     read.getName(); 

     p.apply("ReadLines", read).apply(new CountWords()) 
     .apply(MapElements.via(new FormatAsTextFn())) 
     .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput())); 

    p.run().waitUntilFinish(); 

답변

2

이것은 텍스트의 라인을 읽을 FileIO.match(), FileIO.read() 커스텀 코드의 조합을 사용하여 2.2로 시작 빔있다. 이미 HEAD에서 이것을 사용할 수 있습니다. 또는 릴리스 2.2가 완료 될 때까지 기다릴 수 있습니다 (현재 진행 중입니다).

PCollection<KV<String, String>> filesAndLines = 
    p.apply(FileIO.match().filepattern(...)) 
    .apply(FileIO.read()) 
    .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() { 
    @ProcessElement 
    public void process(ProcessContext c) { 
     ReadableFile f = c.element(); 
     String filename = f.getMetadata().resourceId().toString(); 
     String line; 
     try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) { 
     while ((line = r.readLine()) != null) { 
      c.output(KV.of(filename, line)); 
     } 
     } 
    } 
    })); 
+0

위대한! 감사! 나를 많이 도왔다. –