2016-12-23 5 views
3

나는 CSV 파일이 있으며 미리 열 이름을 알지 못합니다. Google 데이터 흐름에서 일부 변형 후에 JSON에서 데이터를 출력해야합니다.데이터 흐름이있는 CSV 헤더 읽기

머리글 행을 가져 와서 모든 행에 레이블을 침투시키는 가장 좋은 방법은 무엇입니까? 예를 들어

:

a,b,c 
1,2,3 
4,5,6 

는 ... (약)이됩니다 : 당신은 첫 번째 줄에 저장 헤더 데이터를 읽을 것, 사용자 정의 (TextIO.TextSource 유사) FileBasedSource를 구현해야

{a:1, b:2, c:3} 
{a:4, b:5, c:6} 
+0

당신이 자바 나 파이썬에서 필요합니까? – vdolez

답변

5

@Override 
    protected void startReading(final ReadableByteChannel channel) 
    throws IOException { 
     lineReader = new LineReader(channel); 

     if (lineReader.readNextLine()) { 
      final String headerLine = lineReader.getCurrent().trim(); 
      header = headerLine.split(","); 
      readingStarted = true; 
     } 
    } 

및 후자의 경우, 현재 행의 데이터에 t : 나는 빠른 (전체) 솔루션을 구현했습니다

@Override 
    protected boolean readNextRecord() throws IOException { 
     if (!lineReader.readNextLine()) { 
      return false; 
     } 

     final String line = lineReader.getCurrent(); 
     final String[] data = line.split(","); 

     // assumes all lines are valid 
     final StringBuilder record = new StringBuilder(); 
     for (int i = 0; i < header.length; i++) { 
      record.append(header[i]).append(":").append(data[i]).append(", "); 
     } 

     currentRecord = record.toString(); 
     return true; 
    } 

, github 볼 수 있습니다. 나는 또한 입증하는 흐름 단위 테스트를 추가 읽기 :

@Test 
public void test_reading() throws Exception { 
    final File file = 
      new File(getClass().getResource("/sample.csv").toURI()); 
    assertThat(file.exists()).isTrue(); 

    final Pipeline pipeline = TestPipeline.create(); 

    final PCollection<String> output = 
      pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath()))); 

    DataflowAssert 
      .that(output) 
      .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, "); 

    pipeline.run(); 
} 

sample.csv 내용은 다음과 같습니다

a,b,c 
1,2,3 
4,5,6 
+0

그것은 여전히 ​​최신 아파치 빔 버전과 호환됩니까? – vdolez