2 개의 MapReduce 작업을 체인화해야합니다. JobControl을 사용하여 job2를 job1에 종속적으로 설정했습니다. 출력 파일이 만들어집니다 !! 하지만 멈추지 않습니다! 셸에서이 상태로 유지됩니다.(Hadoop) MapReduce - 체인 작업 - JobControl이 중지되지 않습니다
12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1
어떻게 중지 할 수 있습니까? 이것은 내 메인입니다.
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Configuration conf2 = new Configuration();
Job job1 = new Job(conf, "canzoni");
job1.setJarByClass(CanzoniOrdinate.class);
job1.setMapperClass(CanzoniMapper.class);
job1.setReducerClass(CanzoniReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));
Job job2 = new Job(conf2, "songsort");
job2.setJarByClass(CanzoniOrdinate.class);
job2.setMapperClass(CanzoniSorterMapper.class);
job2.setSortComparatorClass(ReverseOrder.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setReducerClass(CanzoniSorterReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
JobControl jobctrl = new JobControl("jobctrl");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
jobctrl.run();
////////////////
// NEW CODE ///
//////////////
// delete jobctrl.run();
Thread t = new Thread(jobctrl);
t.start();
String oldStatusJ1 = null;
String oldStatusJ2 = null;
while (!jobctrl.allFinished()) {
String status =cJob1.toString();
String status2 =cJob2.toString();
if (!status.equals(oldStatusJ1)) {
System.out.println(status);
oldStatusJ1 = status;
}
if (!status2.equals(oldStatusJ2)) {
System.out.println(status2);
oldStatusJ2 = status2;
}
}
System.exit(0);
} }
내가 JobControl을 시작하는 스레드를 사용하여 해결했다. 작업이 while 사이클을 사용하여 완료되었는지 확인했습니다 : while (! jobctrl.allFinished()) 및 System.exit()가 순환을 벗어났습니다. 이제는 작업이 정보 메시지를 반환하고 싶습니다. ControlledJob.toString()을 사용하면 실행중인 작업을 알 수 있습니다. 정보 메시지를 매퍼 작업 수, 줄이기 작업 수, 입력 또는 출력 등의 레코드로 가져 오는 방법을 알지 못합니다. 이러한 메시지를 가져 오는 방법은 무엇입니까? –
"job.getCounters(). toString()"충분합니까? – zsxwing
JobControl 클래스의 버그입니까? – Rags