2017-12-07 31 views
0

k-means 알고리즘을 수행하기 위해 map reduce 프로그램을 만들려고합니다. map reduce를 사용하는 것이 반복 알고리즘을 수행하는 최선의 방법이 아니라는 것을 알고 있습니다. 매퍼 및 감속기 클래스를 만들었습니다. 매퍼 코드에서 입력 파일을 읽었습니다. map reduce가 완료되면 같은 입력 파일에 결과가 저장되기를 원합니다. 출력 파일을 매퍼에서 입력 한 파일을 덮어 쓰게하려면 어떻게합니까?Hadoop Mapreduce, 맵 출력으로 맵퍼에 입력 된 txt 파일을 다시 쓰려면 어떻게해야합니까?

import java.io.IOException; 
import java.util.StringTokenizer; 
import java.util.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import java.util.ArrayList; 


public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> { 
private final static String centroidFile = "centroid.txt"; 
private List<Double> centers = new ArrayList<Double>(); 

public void setup(Context context) throws IOException{ 
     BufferedReader br = new BufferedReader(new 
     FileReader(centroidFile)); 
     String contentLine; 
     while((contentLine = br.readLine())!=null){ 
      centers.add(Double.parseDouble(contentLine)); 
     } 
} 

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException { 

     String[] fields = input.toString().split(" "); 
     Double rating = Double.parseDouble(fields[2]); 
     Double distance = centers.get(0) - rating; 
     int position = 0; 
     for(int i=1; i<centers.size(); i++){ 
      Double cDistance = Math.abs(centers.get(i) - rating); 
      if(cDistance< distance){ 
       position = i; 
       distance = cDistance; 
      } 
     } 
     Double closestCenter = centers.get(position); 
     context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value 

     } 
} 
import java.io.IOException; 
import java.lang.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Reducer; 
import java.util.*; 

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> { 

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string 
      throws IOException, InterruptedException { 
      Iterator<DoubleWritable> v = values.iterator(); 
      double total = 0; 
      double count = 0; 
      String value = ""; //value is the rating 
      while (v.hasNext()){ 
       double i = v.next().get(); 
       value = value + " " + Double.toString(i); 
       total = total + i; 
       ++count; 
      } 
      double nCenter = total/count; 
    context.write(new DoubleWritable(nCenter), new Text(value)); 
} 
} 
import java.util.Arrays; 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class run 
{ 

public static void runJob(String[] input, String output) throws Exception { 

    Configuration conf = new Configuration(); 

    Job job = new Job(conf); 
    Path toCache = new Path("input/centroid.txt"); 
    job.addCacheFile(toCache.toUri()); 
    job.setJarByClass(run.class); 
    job.setMapperClass(kmeansMapper.class); 
    job.setReducerClass(kmeansReducer.class); 
    job.setMapOutputKeyClass(DoubleWritable.class); 
    job.setMapOutputValueClass(DoubleWritable.class); 

    job.setNumReduceTasks(1); 
    Path outputPath = new Path(output); 
    FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath,true); 
    job.waitForCompletion(true); 

} 

public static void main(String[] args) throws Exception { 
    runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]); 

} 

} 

감사

: 또한 그래서 이전 입력 파일과 새 입력 파일의 값은 값의 차이가 0.1

내 코드입니다, 즉 수렴 할 때까지지도가 반복 처리를 줄일 수 있도록

답변

0

나는 당신이 면책 조항을 기재했음을 알고있다.하지만 스파크 나 메모리 문제를 해결할 수있는 다른 프레임 워크로 전환 해주십시오. 너의 삶은 훨씬 나아질거야.

정말로이 작업을 수행하려면 runJob에서 반복적으로 코드를 실행하고 입력에 임시 파일 이름을 사용하십시오. 이를 달성하기 위해 this question on moving files in hadoop을 볼 수 있습니다. 각 반복이 완료된 후

광범위하게
FileSystem fs = FileSystem.get(new Configuration()); 
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input'; 

말하기, 당신이 입력을 설정해야 첫 번째 반복에 대해 물론

fs.delete(tempInputPath) 
fs.rename(outputPath, tempInputPath) 

을 : 당신은 파일 시스템 인스턴스와 입력을위한 임시 파일이 필요합니다 path는 작업을 실행할 때 제공된 입력 경로가됩니다. 후속 반복에서는 이전 반복의 출력이 될 tempInputPath를 사용할 수 있습니다.

+0

답장을 보내 주셔서 감사합니다. runjob의 코드를 반복하는 방법은 무엇입니까? – th308

+0

일반적인 for 루프에서 runJob의 코드 중 필요한 부분 만 감쌀 수 있습니다. –