2017-04-10 8 views
0

안녕하세요 저는 HBase에서 레코드를 읽고 텍스트 파일에 기록하는 응용 프로그램을 실행하고 있습니다.HBase에서 mapoutput 파일을 생성하는 결합 자 mapreduce

저는 제 응용 프로그램 및 사용자 정의 분할기에서도 결합기를 사용했습니다. 커스텀 파티셔너에서 내 상태를 만족시키는 40 개의 감속기 출력 파일을 만들어야하기 때문에 응용 프로그램에서 41 개의 감속기를 사용했습니다.

잘 작동하지만 모든 응용 프로그램에서 결합기를 사용하면 맵 또는 매퍼마다 출력 파일이 만들어집니다.

나는 40 개의 영역을 내 응용 프로그램에 포함하므로 40 개 매퍼가 시작되어 40 개의 맵 출력 파일을 만듭니다. 그러나 감속기는 모든 맵 출력을 결합하고 40 감속기 출력 파일이 될 최종 감속기 출력 파일을 생성 할 수 없습니다.

파일의 데이터는 올 바르고 파일의 수가 증가하지 않았습니다.

어떻게하면 감속기 출력 파일 만 얻을 수 있습니까? 아래

// Reducer Class 
    job.setCombinerClass(CommonReducer.class); 
    job.setReducerClass(CommonReducer.class); // reducer class 

내 작업이 여기에

Submitted: Mon Apr 10 09:42:55 CDT 2017 
Started: Mon Apr 10 09:43:03 CDT 2017 
Finished: Mon Apr 10 10:11:20 CDT 2017 
Elapsed: 28mins, 17sec 
Diagnostics:  
Average Map Time 6mins, 13sec 
Average Shuffle Time 17mins, 56sec 
Average Merge Time 0sec 
Average Reduce Time  0sec 

자세한 사항 내가

context.write() 

multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName); 를 교체 한

import java.io.IOException; 
import org.apache.log4j.Logger; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 

public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> { 

    private Logger logger = Logger.getLogger(CommonCombiner.class); 
    private MultipleOutputs<NullWritable, Text> multipleOutputs; 
    String strName = ""; 
    private static final String DATA_SEPERATOR = "\\|\\!\\|"; 

    public void setup(Context context) { 
     logger.info("Inside Combiner."); 
     multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); 
    } 

    @Override 
    public void reduce(NullWritable Key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 

     for (Text value : values) { 
      final String valueStr = value.toString(); 
      StringBuilder sb = new StringBuilder(); 
      if ("".equals(strName) && strName.length() == 0) { 
       String[] strArrFileName = valueStr.split(DATA_SEPERATOR); 
       String strFullFileName[] = strArrFileName[1].split("\\|\\^\\|"); 

       strName = strFullFileName[strFullFileName.length - 1]; 


       String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR); 
       if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) { 
        sb.append(strArrvalueStr[0] + "|!|"); 
       } 
       multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName); 
       context.getCounter(Counters.FILE_DATA_COUNTER).increment(1); 


      } 

     } 
    } 


    public void cleanup(Context context) throws IOException, InterruptedException { 
     multipleOutputs.close(); 
    } 
} 

답변

0

내 감속기 논리입니다와 나는 CORR있어 출력.