좋아, 그래서 내가 알아 냈어. 가장 단순한 방법인지 확실하지 않습니다. 더 간단한 작업 흐름을 알고 있다면 수정하십시오.
직렬화를 작성하기위한 직렬화 (RecordOutput 아웃)를 RecordInput은 org.apache.hadoop.record 패키지에 제공된 유틸리티 인터페이스입니다. 몇 가지 구현이 있습니다 (예를 들어 XMLRecordOutput, BinaryRecordOutput, CSVRecordOutput)
지금까지 내가 아는 한, 당신이 당신의 자신의 OutputFormat 또는 InputFormat 클래스 이것들을 사용하기를 구현해야합니다. 이것은 상당히 쉽습니다.
는 예를 들어, 내가 (CSV 형식으로 정수 키와 고객 가치를 기록 한) 원래의 질문에 대한 얘기 OutputFormat은 다음과 같이 구현 될 것이다 :
private static class CustomerOutputFormat
extends TextOutputFormat<IntWritable, Customer>
{
public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new CustomerRecordWriter(fileOut);
}
protected static class CustomerRecordWriter
implements RecordWriter<IntWritable, Customer>
{
protected DataOutputStream outStream ;
public AnchorRecordWriter(DataOutputStream out) {
this.outStream = out ;
}
public synchronized void write(IntWritable key, Customer value) throws IOException {
CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
csvOutput.writeInteger(key.get(), "id") ;
value.serialize(csvOutput) ;
}
public synchronized void close(Reporter reporter) throws IOException {
outStream.close();
}
}
}
InputFormat 만들기 거의 동일합니다. csv 형식은 한 줄에 하나의 항목이기 때문에 대부분의 작업을 수행하기 위해 내부적으로 LineRecordReader를 사용할 수 있습니다.
private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {
public RecordReader<IntWritable, Customer> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new CustomerRecordReader(job, (FileSplit) genericSplit);
}
private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {
private LineRecordReader lrr ;
public CustomerRecordReader(Configuration job, FileSplit split)
throws IOException{
this.lrr = new LineRecordReader(job, split);
}
public IntWritable createKey() {
return new IntWritable();
}
public Customer createValue() {
return new Customer();
}
public synchronized boolean next(IntWritable key, Customer value)
throws IOException {
LongWritable offset = new LongWritable() ;
Text line = new Text() ;
if (!lrr.next(offset, line))
return false ;
CsvRecordInput cri = new CsvRecordInput(new
ByteArrayInputStream(line.toString().getBytes())) ;
key.set(cri.readInt("id")) ;
value.deserialize(cri) ;
return true ;
}
public float getProgress() {
return lrr.getProgress() ;
}
public synchronized long getPos() throws IOException {
return lrr.getPos() ;
}
public synchronized void close() throws IOException {
lrr.close();
}
}
}