2017-04-25 9 views
0

을 생산하고 있습니다. 나는 다음과 같은 출력받을하둡 맵리 듀스 독특한 패턴이 나는 독특한 패턴을 구현하기 위해 노력하고있어 중복 키

context.write(key, new IntWtitable(key.hashCode()); 

: 나는에 방출하면 키와 해시 감소 이론적으로

key1 -1808937256 
key2 -768063202 
key3 906064410 
key2 -768063202 
key3 906064410 

을 출력 단지 내가 key1, key2key3을 때죠 포함해야 HashPartitioner : 동일한 해시 코드를 갖는 키를 사용하여 같은 파티션에 결합됩니다. 이것은 분명히 여기에 해당하지 않습니다.

나는 Text 객체로 내 복잡한 Writable 변환 (그에 따라 매퍼/감속기 클래스를 적용) 및 Mapper에 방출하고있어 경우

context.write(new Text(key.toString()), NullWritable.get()); 

이 ... 출력이 예상과 같이

key1 1013632023 
key2 762485389 
key3 -1193948769 

확인을 클릭하고 동작을 보여주는 최소 작동 예제입니다.

입력 :

A A A A A 
B B B B B 
C C C C C 
A A A A A 
B B B B B 

맵리 듀스 작업 :

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.ArrayWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class DistinctPattern extends Configured implements Tool { 
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> { 


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
     ComplexObject o = new ComplexObject(value.toString()); 
     context.write(o, NullWritable.get()); 
    } 
} 

public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> { 


    public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context) 
      throws IOException, InterruptedException { 

     context.write(key, new IntWritable(key.hashCode())); 
    } 
} 

public static class MyArrayWritable extends ArrayWritable { 

    public MyArrayWritable(Writable[] values) { 
     super(DatumObject.class, values); 
    } 

    public MyArrayWritable() { 
     super(DatumObject.class); 
    } 

    @Override 
    public String toString() { 
     return Arrays.toString(get()); 
    } 

} 

public static class DatumObject implements Writable { 
    private String datum; 

    public DatumObject() {} 

    public DatumObject(String d) { 
     datum = d; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     datum = in.readUTF(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeUTF(datum);  
    } 

    @Override 
    public String toString() { 
     return datum; 
    } 

    @Override 
    public int hashCode() { 
     return 31 * datum.hashCode(); 
    } 

} 

public static class ComplexObject implements WritableComparable<ComplexObject> { 
    private List<DatumObject> data = new ArrayList<>(); 

    public ComplexObject() {} 

    public ComplexObject(String d) { 
     String[] elements = d.split(" "); 
     for(int i = 0; i < elements.length; i++) 
      data.add(new DatumObject(elements[i])); 
    } 

    public int size() { 
     return data.size(); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     data.clear(); 
     MyArrayWritable m = new MyArrayWritable(); 
     m.readFields(in); 
     Writable[] w = m.get(); 
     for(int i = 0; i < w.length; i++) 
      data.add((DatumObject) w[i]); 

    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()])); 
     m.write(out); 
    } 

    @Override 
    public int compareTo(ComplexObject o) { 
     if(this.equals(o)) 
      return 0; 

     if(o.size() < this.size()) 
      return -1; 

     return 1; 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if(!(obj instanceof ComplexObject)) 
      return false; 

     ComplexObject other = (ComplexObject) obj; 
     return other.data.equals(data); 
    } 

    @Override 
    public int hashCode() { 
     return 31 * data.hashCode(); 
    } 

    @Override 
    public String toString() { 
     StringBuilder s= new StringBuilder(); 
     data.forEach(entry -> { 
      s.append(entry); 
      s.append(" "); 
     }); 

     return s.toString(); 
    } 

} 

@Override 
public int run(String[] args) throws Exception { 
    Job job = Job.getInstance(); 
    job.setJar("distinct.jar"); 
    job.setJarByClass(DistinctPattern.class); 
    job.setMapperClass(DistinctMapper.class); 
    job.setReducerClass(DistinctReducer.class); 
    job.setMapOutputKeyClass(ComplexObject.class); 
    job.setMapOutputValueClass(NullWritable.class); 
    job.setOutputKeyClass(ComplexObject.class); 
    job.setOutputValueClass(IntWritable.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception {  
    int exitCode = ToolRunner.run(new DistinctPattern(), args); 
    System.exit(exitCode); 
} 
} 

예상 출력 :

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 

실제 출력 :

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 
A A A A A  368623362 
B B B B B  1285710467 

무엇이 누락 되었습니까?

PS : 하둡 2.7.3

답변

0

좋아, 내 코드에서 실수 (들)을 발견했다. 첫째, 최소한의 작업 예제는 클래스 DatumObject에서 equals 메소드의 구현을 결여 :

@Override 
public boolean equals(Object obj) { 
    if(obj == null) 
     return false; 

    if(!(obj instanceof DatumObject)) 
     return false; 

    DatumObject other = (DatumObject) obj; 
     return other.datum.equals(datum); 
} 

둘째, 나는, 최소한의 작업의 예에서 재현하지만 내 실제 코드에 나타나는 수없는 한 측면이있다 모든의 내 key 클래스는 WritableComparable 인터페이스를 구현했습니다. 결과적으로 셔플 단계가 예상대로 키를 정렬하지 않았을 것으로 판단됩니다. 내 key 값 (see class diagram here)을 구성하는 모든 클래스에서 compareTo 메서드를 올바르게 구현하면 고유 한 패턴이 예상대로 작동합니다.