2010-05-16 3 views
1

DDL에서 생성 된 클래스를 사용하여 데이터를 읽고 쓰는 기본 작업 흐름을 통해 나를 걸을 수 있습니까?Hadoop/MapReduce : DDL에서 생성 된 클래스 읽기 및 쓰기

DDL을 사용하여 구조체와 유사한 레코드를 정의했습니다. 예를 들면 다음과 같습니다.

class Customer { 
    ustring FirstName; 
    ustring LastName; 
    ustring CardNo; 
    long LastPurchase; 
    } 

고객 클래스를 얻고이를 내 프로젝트에 포함 시켰습니다. 나는 그것을 mappers와 reducers (생성 된 클래스가 Writable을 구현)에 대한 입력과 출력으로 사용하는 방법을 쉽게 볼 수 있지만 파일을 읽고 쓰는 방법은 볼 수 없다.

org.apache.hadoop.record 패키지의 JavaDoc에서는 이러한 레코드를 이진, CSV 또는 XML 형식으로 serialize하는 방법에 대해 설명합니다. 실제로 어떻게합니까? 내 감속기가 IntWritable 키와 고객 값을 생성한다고 가정 해보십시오. 결과를 CSV 형식으로 쓰려면 어떤 OutputFormat을 사용해야합니까? 필자가 분석을 수행하고 싶다면 나중에 InputFormat을 사용하여 결과 파일을 읽어야합니까?

답변

1

좋아, 그래서 내가 알아 냈어. 가장 단순한 방법인지 확실하지 않습니다. 더 간단한 작업 흐름을 알고 있다면 수정하십시오.

DDL에서 생성 된 모든 클래스

는 기록 인터페이스를 구현하고, 결과적으로 두 가지 방법을 제공합니다

RecordOutput

를 읽는 (에서 RecordInput) 직렬화를 작성하기위한

직렬화 (RecordOutput 아웃)RecordInputorg.apache.hadoop.record 패키지에 제공된 유틸리티 인터페이스입니다. 몇 가지 구현이 있습니다 (예를 들어 XMLRecordOutput, BinaryRecordOutput, CSVRecordOutput)

지금까지 내가 아는 한, 당신이 당신의 자신의 OutputFormat 또는 InputFormat 클래스 이것들을 사용하기를 구현해야합니다. 이것은 상당히 쉽습니다.

는 예를 들어, 내가 (CSV 형식으로 정수 키와 고객 가치를 기록 한) 원래의 질문에 대한 얘기 OutputFormat은 다음과 같이 구현 될 것이다 :


    private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
    { 

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored, 
     JobConf job, 
     String name, 
     Progressable progress) 
    throws IOException { 
     Path file = FileOutputFormat.getTaskOutputPath(job, name); 
     FileSystem fs = file.getFileSystem(job); 
     FSDataOutputStream fileOut = fs.create(file, progress); 
     return new CustomerRecordWriter(fileOut); 
    } 

    protected static class CustomerRecordWriter 
     implements RecordWriter<IntWritable, Customer> 
    { 

     protected DataOutputStream outStream ; 

     public AnchorRecordWriter(DataOutputStream out) { 
     this.outStream = out ; 
     } 

     public synchronized void write(IntWritable key, Customer value) throws IOException { 

     CsvRecordOutput csvOutput = new CsvRecordOutput(outStream); 
     csvOutput.writeInteger(key.get(), "id") ; 
     value.serialize(csvOutput) ; 
     } 

     public synchronized void close(Reporter reporter) throws IOException { 
     outStream.close(); 
     } 
    } 
    } 

InputFormat 만들기 거의 동일합니다. csv 형식은 한 줄에 하나의 항목이기 때문에 대부분의 작업을 수행하기 위해 내부적으로 LineRecordReader를 사용할 수 있습니다.



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> { 

    public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job, 
    Reporter reporter) 
    throws IOException { 

    reporter.setStatus(genericSplit.toString()); 
    return new CustomerRecordReader(job, (FileSplit) genericSplit); 
    } 

    private class CustomerRecordReader implements RecordReader<IntWritable, Customer> { 

    private LineRecordReader lrr ; 

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{ 
     this.lrr = new LineRecordReader(job, split);  
    } 

    public IntWritable createKey() { 
     return new IntWritable(); 
    } 

    public Customer createValue() { 
     return new Customer(); 
    } 

    public synchronized boolean next(IntWritable key, Customer value) 
    throws IOException { 

     LongWritable offset = new LongWritable() ; 
     Text line = new Text() ; 

     if (!lrr.next(offset, line)) 
     return false ; 

     CsvRecordInput cri = new CsvRecordInput(new  
     ByteArrayInputStream(line.toString().getBytes())) ; 
     key.set(cri.readInt("id")) ; 
     value.deserialize(cri) ; 

     return true ; 
    } 

    public float getProgress() { 
     return lrr.getProgress() ; 
    } 

    public synchronized long getPos() throws IOException { 
     return lrr.getPos() ; 
    } 

    public synchronized void close() throws IOException { 
     lrr.close(); 
    } 
    } 
}