2012-09-11 4 views
9

2 개의 MapReduce 작업을 체인화해야합니다. JobControl을 사용하여 job2를 job1에 종속적으로 설정했습니다. 출력 파일이 만들어집니다 !! 하지만 멈추지 않습니다! 셸에서이 상태로 유지됩니다.(Hadoop) MapReduce - 체인 작업 - JobControl이 중지되지 않습니다

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1 
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1 

어떻게 중지 할 수 있습니까? 이것은 내 메인입니다.

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Configuration conf2 = new Configuration(); 

    Job job1 = new Job(conf, "canzoni"); 
    job1.setJarByClass(CanzoniOrdinate.class); 
    job1.setMapperClass(CanzoniMapper.class); 
    job1.setReducerClass(CanzoniReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob1 = new ControlledJob(conf); 
    cJob1.setJob(job1); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp")); 


    Job job2 = new Job(conf2, "songsort"); 
    job2.setJarByClass(CanzoniOrdinate.class); 
    job2.setMapperClass(CanzoniSorterMapper.class); 
    job2.setSortComparatorClass(ReverseOrder.class); 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    job2.setReducerClass(CanzoniSorterReducer.class); 
    job2.setMapOutputKeyClass(IntWritable.class); 
    job2.setMapOutputValueClass(Text.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    cJob2.setJob(job2); 
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[1])); 

    JobControl jobctrl = new JobControl("jobctrl"); 
    jobctrl.addJob(cJob1); 
    jobctrl.addJob(cJob2); 
    cJob2.addDependingJob(cJob1); 
    jobctrl.run(); 


    //////////////// 
    // NEW CODE /// 
    ////////////// 


    // delete jobctrl.run(); 
    Thread t = new Thread(jobctrl); 
    t.start(); 
    String oldStatusJ1 = null; 
    String oldStatusJ2 = null; 
    while (!jobctrl.allFinished()) { 
     String status =cJob1.toString(); 
     String status2 =cJob2.toString(); 
     if (!status.equals(oldStatusJ1)) { 
     System.out.println(status); 
     oldStatusJ1 = status; 
     } 
     if (!status2.equals(oldStatusJ2)) { 
     System.out.println(status2); 
     oldStatusJ2 = status2; 
     }  
    } 
    System.exit(0); 

} }

+1

내가 JobControl을 시작하는 스레드를 사용하여 해결했다. 작업이 while 사이클을 사용하여 완료되었는지 확인했습니다 : while (! jobctrl.allFinished()) 및 System.exit()가 순환을 벗어났습니다. 이제는 작업이 정보 메시지를 반환하고 싶습니다. ControlledJob.toString()을 사용하면 실행중인 작업을 알 수 있습니다. 정보 메시지를 매퍼 작업 수, 줄이기 작업 수, 입력 또는 출력 등의 레코드로 가져 오는 방법을 알지 못합니다. 이러한 메시지를 가져 오는 방법은 무엇입니까? –

+0

"job.getCounters(). toString()"충분합니까? – zsxwing

+0

JobControl 클래스의 버그입니까? – Rags

답변

5

나는 기본적으로 피에트로 위에서 언급 무엇을했다.

public class JobRunner implements Runnable { 
    private JobControl control; 

    public JobRunner(JobControl _control) { 
    this.control = _control; 
    } 

    public void run() { 
    this.control.run(); 
    } 
} 

내지도에

는/내가 가진 클래스를 감소 :

public void handleRun(JobControl control) throws InterruptedException { 
    JobRunner runner = new JobRunner(control); 
    Thread t = new Thread(runner); 
    t.start(); 

    while (!control.allFinished()) { 
     System.out.println("Still running..."); 
     Thread.sleep(5000); 
    } 
} 

하는 난 그냥 jobControl 객체를 전달합니다.

+2

+1 실례를 제공함 – beterthanlife

3

JobControl 객체 자체는 Runnable를, 그래서 당신은 다음과 같이 사용할 수 있습니다 :

new Thread(myJobControlInstance).start() 
0

공유 한 것을 sinemetu1 코드 조각에 그냥 팅겨 ..

당신은에 전화를 놓을 수 있습니다 사용자가 JobControl가 만 새로운 스레드를 실행할 수 있습니다 것을 확인 어디 그 자체로 JobControl 같은 JobRunner 내가이 링크 우연히

 Thread thread = new Thread(jobControl); 
     thread.start(); 

     while (!jobControl.allFinished()) { 
      System.out.println("Still running..."); 
      Thread.sleep(5000); 
     } 

Runnable를 구현합니다. https://www.mail-archive.com/[email protected]/msg00556.html

0

이 시도 :

Thread jcThread = new Thread(jobControl); 
    jcThread.start(); 
    System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>"); 
    while (true) { 
     if (jobControl.allFinished()) { 
     System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); 
     jobControl.stop(); 
     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 

    if (jobControl.getFailedJobList().size() > 0) { 
     succ = 0; 
     System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); 
     jobControl.stop(); 

     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 
}