2012-07-31 1 views
9

많은 MapReduce 프로그램에서 결합기로 사용되는 감속기를 볼 수 있습니다. 나는 이것이 그 프로그램의 특별한 본질 때문이라는 것을 안다. 그러나 그들이 다를 수 있는지 궁금합니다.결합기 및 감속기가 다를 수 있습니까?

답변

22

예, 결합기는 여전히 Reducer 인터페이스를 구현하지만 결합기는 Reducer와 다를 수 있습니다. 조합원은 직업에 따라 달라질 특정 경우에만 사용할 수 있습니다. Combiner는 감속기처럼 작동하지만 각 매퍼의 키/값 출력의 하위 집합에서만 작동합니다.

Reducer와 달리 결합 자의 제한 조건은 입력/출력 키 및 값 유형 이 매퍼의 출력 유형과 일치해야한다는 것입니다.

+1

해야 하는가? – user1170330

+1

정말 유용한 링크입니다. 결합자를 사용해야하는시기를 설명합니다. http://www.philippeadjiman.com/blog/2010/01/14/hadoop-tutorial-series-issue-4-to-use-or-not-to-use-a-combiner / – mk2

7

그래, 그들은 확실히 다를 수 있지만, 나는 당신이 예기치 않은 결과를 얻을 것이라고 주로 다른 수업을 사용하고 싶지 않아요.

Combiners는 교환 가능 (a.b = b.a) 및 연관 {a. (b.c) = (a.b) .c} 인 함수에서만 사용할 수 있습니다. 이는 또한 결합 자 (combiner)가 키와 값의 하위 집합에서만 작동하거나 전혀 실행하지 않을 수도 있음을 의미합니다. 그래도 프로그램 출력이 동일하게 유지되기를 원합니다.

다른 논리로 다른 클래스를 선택하면 논리 출력이 제공되지 않을 수도 있습니다.

+0

이것은 결합기가 로컬 감속기이라고 할 수 허용 대답 – mk2

0

컴의 주요 목표는/최적화 가능한 한 가장 대역폭을 절약하는 것이 맵퍼 및 감속기 사이의 네트워크를 통해 단행 할 합니다 키 값 쌍의 수를 최소화하는 것입니다.

결합기의 엄지 손가락 법칙, 그것은 동일한 입력 및 출력 변수 타입, 그 이유 을 가지고 갖는다는 또는 부피 및 번호를 따라 사용될 수있다, 결합기 사용이 보장되지이고 유출.

감속기는이 규칙, 즉 동일한 입력 및 출력 변수 유형을 만족할 때 결합기로 사용될 수 있습니다.

결합 자에 대한 다른 가장 중요한 규칙은 이 적용 할 수있는 기능이 교환 가능하고 연관성이있는 경우에만 사용할 수 있다는 것입니다. 숫자를 추가하는 것과 같습니다. 그러나 평균과 같은 경우는 아닙니다 (감속기와 동일한 코드를 사용하는 경우).

귀하의 질문에 답하기 위해 물론 다를 수 있습니다. 귀하의 감속기가 다른 유형의 입력 및 출력 변수를 가지고있을 때 선택의 여지가 없지만 다른 감속기 코드 사본을 만들고 수정하려면 .

ur가 감속기의 논리에 관심이 있다면 다른 방법으로도 구현할 수 있습니다. 결합기의 경우 결합기에 오는 모든 값의 로컬 버퍼를 갖도록 컬렉션 개체를 가질 수 있습니다 감속기의 경우에는 결합기보다 메모리에서 벗어나는 경향이 있기 때문에 감속기에서 사용하는 것보다 위험이 적습니다. 다른 논리 차이가 확실히 존재할 수 있으며 그렇게합니다.

2

구현은 다음과 같습니다. 결합 자 및 결합 자없이 실행할 수 있습니다. 둘 다 정확히 동일한 대답을 제공합니다. 여기서 Reducer와 Combiner는 서로 다른 동기와 구현 방법을 가지고 있습니다.

package combiner; 

import java.io.IOException; 


import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class Map extends Mapper<LongWritable, Text, Text, Average> { 

Text name = new Text(); 
String[] row; 

protected void map(LongWritable offSet, Text line, Context context) throws IOException, InterruptedException { 
    row = line.toString().split(" "); 
    System.out.println("Key "+row[0]+"Value "+row[1]); 
    name.set(row[0]); 
    context.write(name, new Average(Integer.parseInt(row[1].toString()), 1)); 
}} 

클래스를

public class Reduce extends Reducer<Text, Average, Text, LongWritable> { 
    LongWritable avg =new LongWritable(); 
    protected void reduce(Text key, Iterable<Average> val, Context context)throws IOException, InterruptedException { 
    int total=0; int count=0; long avgg=0; 

    for (Average value : val){ 
     total+=value.number*value.count; 
     count+=value.count; 
     avgg=total/count; 
     } 
    avg.set(avgg); 
    context.write(key, avg); 
} 
} 

MapObject 클래스

public class Average implements Writable { 

long number; 
int count; 

public Average() {super();} 

public Average(long number, int count) { 
    this.number = number; 
    this.count = count; 
} 

public long getNumber() {return number;} 
public void setNumber(long number) {this.number = number;} 
public int getCount() {return count;} 
public void setCount(int count) {this.count = count;} 

@Override 
public void readFields(DataInput dataInput) throws IOException { 
    number = WritableUtils.readVLong(dataInput); 
    count = WritableUtils.readVInt(dataInput);  
} 

@Override 
public void write(DataOutput dataOutput) throws IOException { 
    WritableUtils.writeVLong(dataOutput, number); 
    WritableUtils.writeVInt(dataOutput, count); 

} 
} 

컴 클래스

public class Combine extends Reducer<Text, Average, Text, Average>{ 

protected void reduce(Text name, Iterable<Average> val, Context context)throws IOException, InterruptedException { 
    int total=0; int count=0; long avg=0; 

    for (Average value : val){ 
     total+=value.number; 
     count+=1; 
     avg=total/count;  
     } 
    context.write(name, new Average(avg, count)); 

} 
} 

드라이버 클래스

감소
public class Driver1 { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    if (args.length != 2) { 
     System.err.println("Usage: SecondarySort <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "CustomCobiner"); 
    job.setJarByClass(Driver1.class); 
    job.setMapperClass(Map.class); 
    job.setCombinerClass(Combine.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Average.class); 
    job.setReducerClass(Reduce.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class);  
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

힘내 here

의 코드는 UR 제안을 남겨주세요 ..