2017-01-23 7 views
0

mapreduce 코드는 빈 출력 파일을 생성합니다. 코드와 입력은 아래와 같습니다.mapreduce 코드는 빈 출력 파일을 생성합니다. 코드와 입력 값은 다음과 같습니다.

package temperature; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 


import java.io.IOException; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 
public class TemperatureMapper extends Mapper<Text, Text, Text, IntWritable> { 

    @Override 
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException { 
    if (isValueValid(value.toString())) { 
     Text key2 = new Text(getStateFromValue(value.toString())); 
     IntWritable value2 = new IntWritable(getTemperatureFrom(value.toString())); 
     context.write(key2, value2); 
    } 
    } 

    private boolean isValueValid(final String value) { 
    // We expect that the value is a String in the form of : State, Temperature. E.g. MP,77 
    Pattern p = Pattern.compile("\\S\\S\\,\\d+"); 
    Matcher m = p.matcher(value); 
    return m.matches(); 
    } 

    private String getStateFromValue(final String value) { 
    final String[] subvalues = value.split("\\,"); 
    return subvalues[0]; 
    } 

    private int getTemperatureFrom(final String value) { 
    final String[] subvalues = value.split("\\,"); 
    return Integer.parseInt(subvalues[1]); 
    } 
} 

    public class TemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 

    @Override 
    protected void reduce(final Text key, final Iterable<IntWritable> values, final Context context) throws IOException, InterruptedException { 
    int sumOfTemperatures = 0; 
    int nbValues = 0; 
    int average=0; 
    for (IntWritable temperature : values) { 
     sumOfTemperatures += temperature.get(); 
     nbValues++; 
    } 
    average = sumOfTemperatures/nbValues; 
    context.write(key, new IntWritable(average)); 
    } 
} 
public class average { 

    public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    /*if (otherArgs.length != 2) { 
     System.err.println("Usage: Main <in> <out>"); 
     System.exit(-1); 
    }*/ 
    Job job = new Job(conf, "Calculate average Temperature"); 
    job.setInputFormatClass(KeyValueTextInputFormat.class); 
    FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 

    job.setJarByClass(average.class); 

    job.setMapperClass(TemperatureMapper.class); 
    job.setReducerClass(TemperatureReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    System.exit(job.waitForCompletion(true) ? 0 : -1); 
    } 

} 

코드는 입력에 대해 잘 작동 :

Ujjain MP,77 
Bhopal MP,76 
Indore MP,72 
Raipur CG,72 
Durg CG,75 
Raigarth  CG,70 
Kendujhar OR,69 
Bhubaneswar OR,71 
Puri OR,76 

하지만 같은 일부 임의 입력 :

hello VI,6 
bye RE,2 

그것은 오히려 빈 출력 파일을 생성합니다.

+0

당신이 value.toString "의 출력을 공유 할 수 불평하지 않는 이유를 잘 모르겠습니다, 값 LongWritable에서 키 유형을 변경했다() "전에지도 방법에 조건이 있으십니까? 아니면 jobtracker가 귀하의지도가 입력으로 2 개의 레코드를 받았음을 보여주는 것을 보시겠습니까? – Amit

답변

0

은 국가 나는 그것이 도움이되기를 바랍니다

String[] subvalues = value.split("\\,")[0].split(" "); 
return subvalues[subvalues.length - 1]; 

을 얻기 위해 다시

Pattern p = Pattern.compile("[a-zA-Z]*\\s*[a-zA-Z]{2},\\d+$"); 

또한 입력의 종류를 지원하기 위해 다음, 당신이 필요합니다 분할을위한 정규 표현식을 수정합니다. 내 옆에서, 내가, 우리의 옆에 아마 다른 API 버전

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {