2013-09-05 4 views
6

시퀀스 파일을 읽는 추가 사용자 지정 InputFormat을 만들 수 있지만 레코드가있는 해당 파일 내에서 파일 경로와 오프셋을 노출합니다.SequenceFileInputFormat 파일 이름 + 오프셋을 포함하도록 확장합니다.

한 걸음 뒤로 물러나려면 다음과 같은 유스 케이스가 있습니다. 가변 크기의 데이터가 포함 된 시퀀스 파일이 있습니다. 키는 대부분 관련성이 없으며 값은 다양한 필드가 포함 된 최대 2 메가 바이트입니다. 나는 elasticsearch에서 이러한 필드 중 일부를 파일 이름과 오프셋과 함께 색인화하려고합니다. 이렇게하면 elasticsearch에서 해당 필드를 쿼리 한 다음 파일 이름과 오프셋을 사용하여 전체 파일을 ES에 저장하는 대신 시퀀스 파일로 돌아가서 원본 레코드를 가져올 수 있습니다.

나는이 모든 과정을 하나의 자바 프로그램으로 작업한다. SequenceFile.Reader 클래스는 편리하게 이것을 실현하기 위해 getPositionseek 메서드를 제공합니다.

그러나 결국에는 수 테라 바이트의 데이터가 포함되므로이 데이터를 MapReduce 작업 (아마도 맵 전용)으로 변환해야 할 것입니다. 시퀀스 파일의 실제 키는 관계가 없기 때문에 내가 취하고 자하는 접근 방식은 SquenceFileInputFormat을 확장하거나 어떻게 사용 하던지, 실제 키를 반환하는 대신에 파일로 구성된 복합 키를 반환하는 사용자 지정 InputFormat을 만드는 것입니다 및 오프셋.

그러나 실제로는 더 어려워졌습니다. 그것은 가능한 것처럼 보이지만, 실제 API와 노출 된 것이 주어지면 까다 롭습니다. 어떤 아이디어? 어쩌면 내가 취해야 할 다른 방법일까요?

답변

5

누구나 비슷한 문제가 발생하는 경우, 여기 나와있는 해결책이 있습니다. SequenceFileInputFormat/RecordReader의 일부 코드를 복제하고 수정하는 것으로 끝났습니다. 나는 서브 클래스 또는 장식 또는 뭔가 하나를 작성하는 희망했다 ...이 방법은 꽤 아니지만, 그것은 작동합니다

SequenceFileOffsetInputFormat.java :

import java.io.IOException; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { 

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { 

     private SequenceFile.Reader in; 
     private long start; 
     private long end; 
     private boolean more = true; 
     private PathOffsetWritable key = null; 
     private Writable k = null; 
     private V value = null; 
     private Configuration conf; 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
      FileSplit fileSplit = (FileSplit) split; 
      conf = context.getConfiguration(); 
      Path path = fileSplit.getPath(); 
      FileSystem fs = path.getFileSystem(conf); 
      this.in = new SequenceFile.Reader(fs, path, conf); 
      try { 
       this.k = (Writable) in.getKeyClass().newInstance(); 
       this.value = (V) in.getValueClass().newInstance(); 
      } catch (InstantiationException e) { 
       throw new IOException(e); 
      } catch (IllegalAccessException e) { 
       throw new IOException(e); 
      } 
      this.end = fileSplit.getStart() + fileSplit.getLength(); 

      if (fileSplit.getStart() > in.getPosition()) { 
       in.sync(fileSplit.getStart()); 
      } 

      this.start = in.getPosition(); 
      more = start < end; 

      key = new PathOffsetWritable(path, start); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (!more) { 
       return false; 
      } 
      long pos = in.getPosition(); 

      more = in.next(k, value); 
      if (!more || (pos >= end && in.syncSeen())) { 
       key = null; 
       value = null; 
       more = false; 
      } else { 
       key.setOffset(pos); 
      } 
      return more; 
     } 

     @Override 
     public PathOffsetWritable getCurrentKey() { 
      return key; 
     } 

     @Override 
     public V getCurrentValue() { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (end == start) { 
       return 0.0f; 
      } else { 
       return Math.min(1.0f, (in.getPosition() - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void close() throws IOException { 
      in.close(); 
     } 

    } 

    @Override 
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new SequenceFileOffsetRecordReader<V>(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 
     return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); 
    } 

    @Override 
    public long getFormatMinSplitSize() { 
     return SequenceFile.SYNC_INTERVAL; 
    } 


} 

PathOffsetWritable.java :

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { 

    private Text t = new Text(); 
    private Path path; 
    private long offset; 

    public PathOffsetWritable(Path path, long offset) { 
     this.path = path; 
     this.offset = offset; 
    } 

    public Path getPath() { 
     return path; 
    } 

    public long getOffset() { 
     return offset; 
    } 

    public void setPath(Path path) { 
     this.path = path; 
    } 

    public void setOffset(long offset) { 
     this.offset = offset; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     t.readFields(in); 
     path = new Path(t.toString()); 
     offset = in.readLong(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     t.set(path.toString()); 
     t.write(out); 
     out.writeLong(offset); 
    } 

    @Override 
    public int compareTo(PathOffsetWritable o) { 
     int x = path.compareTo(o.path); 
     if (x != 0) { 
      return x; 
     } else { 
      return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); 
     } 
    } 


}