@Override
protected void startReading(final ReadableByteChannel channel)
throws IOException {
lineReader = new LineReader(channel);
if (lineReader.readNextLine()) {
final String headerLine = lineReader.getCurrent().trim();
header = headerLine.split(",");
readingStarted = true;
}
}
및 후자의 경우, 현재 행의 데이터에 t : 나는 빠른 (전체) 솔루션을 구현했습니다
@Override
protected boolean readNextRecord() throws IOException {
if (!lineReader.readNextLine()) {
return false;
}
final String line = lineReader.getCurrent();
final String[] data = line.split(",");
// assumes all lines are valid
final StringBuilder record = new StringBuilder();
for (int i = 0; i < header.length; i++) {
record.append(header[i]).append(":").append(data[i]).append(", ");
}
currentRecord = record.toString();
return true;
}
, github 볼 수 있습니다. 나는 또한 입증하는 흐름 단위 테스트를 추가 읽기 :
@Test
public void test_reading() throws Exception {
final File file =
new File(getClass().getResource("/sample.csv").toURI());
assertThat(file.exists()).isTrue();
final Pipeline pipeline = TestPipeline.create();
final PCollection<String> output =
pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));
DataflowAssert
.that(output)
.containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");
pipeline.run();
}
곳 sample.csv
내용은 다음과 같습니다
a,b,c
1,2,3
4,5,6
당신이 자바 나 파이썬에서 필요합니까? – vdolez