k-means 알고리즘을 수행하기 위해 map reduce 프로그램을 만들려고합니다. map reduce를 사용하는 것이 반복 알고리즘을 수행하는 최선의 방법이 아니라는 것을 알고 있습니다. 매퍼 및 감속기 클래스를 만들었습니다. 매퍼 코드에서 입력 파일을 읽었습니다. map reduce가 완료되면 같은 입력 파일에 결과가 저장되기를 원합니다. 출력 파일을 매퍼에서 입력 한 파일을 덮어 쓰게하려면 어떻게합니까?Hadoop Mapreduce, 맵 출력으로 맵퍼에 입력 된 txt 파일을 다시 쓰려면 어떻게해야합니까?
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.FileReader;
import java.io.BufferedReader;
import java.util.ArrayList;
public class kmeansMapper extends Mapper<Object, Text, DoubleWritable,
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();
public void setup(Context context) throws IOException{
BufferedReader br = new BufferedReader(new
FileReader(centroidFile));
String contentLine;
while((contentLine = br.readLine())!=null){
centers.add(Double.parseDouble(contentLine));
}
}
public void map(Object key, Text input, Context context) throws IOException,
InterruptedException {
String[] fields = input.toString().split(" ");
Double rating = Double.parseDouble(fields[2]);
Double distance = centers.get(0) - rating;
int position = 0;
for(int i=1; i<centers.size(); i++){
Double cDistance = Math.abs(centers.get(i) - rating);
if(cDistance< distance){
position = i;
distance = cDistance;
}
}
Double closestCenter = centers.get(position);
context.write(new DoubleWritable(closestCenter),new
DoubleWritable(rating)); //outputs closestcenter and rating value
}
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;
public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable,
DoubleWritable, Text> {
public void reduce(DoubleWritable key, Iterable<DoubleWritable> values,
Context context)// get count // get total //get values in a string
throws IOException, InterruptedException {
Iterator<DoubleWritable> v = values.iterator();
double total = 0;
double count = 0;
String value = ""; //value is the rating
while (v.hasNext()){
double i = v.next().get();
value = value + " " + Double.toString(i);
total = total + i;
++count;
}
double nCenter = total/count;
context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class run
{
public static void runJob(String[] input, String output) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
Path toCache = new Path("input/centroid.txt");
job.addCacheFile(toCache.toUri());
job.setJarByClass(run.class);
job.setMapperClass(kmeansMapper.class);
job.setReducerClass(kmeansReducer.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setNumReduceTasks(1);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath,true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);
}
}
감사
: 또한 그래서 이전 입력 파일과 새 입력 파일의 값은 값의 차이가 0.1내 코드입니다, 즉 수렴 할 때까지지도가 반복 처리를 줄일 수 있도록
답장을 보내 주셔서 감사합니다. runjob의 코드를 반복하는 방법은 무엇입니까? – th308
일반적인 for 루프에서 runJob의 코드 중 필요한 부분 만 감쌀 수 있습니다. –