4

다음 코드는 EMR 작업을 실행하는 데 성공했으며 성공적으로 실행됩니다. 또한 실행 상태를 모니터링하려고합니다. DescribeJobFlows API를 사용하지만이 API는 http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/AmazonElasticMapReduceClient.html에 따라 사용되지 않습니다.AWS EMR 작업 진행 상태를 모니터링하는 가장 좋은 방법은 무엇입니까?

EMR 실행 진행 상황을 모니터링하는 모범 사례에 대해 도움을 줄 수 있습니까? DescribeJobFlows 이후

public class EmrJobRunner { 
    public static void main(String[] args) { 
    // args is [input_file_path, output_directory], make sure output_directory does not exist 
    String inputFilePath = "s3://mybucket/emr/input"; 
    String outputDirectory = "s3://mybucket/emr/output/" + System.currentTimeMillis(); 
    String jarName = "WordCount.jar"; 
    String jarPath = "s3://mybucket/emr/" + jarName; 
    String logPath = "s3://mybucket/emr/logs"; 

    String TERMINATE_JOB_FLOW = "TERMINATE_JOB_FLOW"; 
    String CONTINUE = "CONTINUE"; 

    AWSCredentials credentials = new BasicAWSCredentials("pub_key", "sec_key"); 
    StepFactory stepFactory = new StepFactory(); 

    AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials); 
    emr.setRegion(Region.getRegion(Regions.AP_SOUTHEAST_1)); 

    StepConfig enableDebugging = new StepConfig() 
     .withName("Enable debugging") 
     .withActionOnFailure(TERMINATE_JOB_FLOW) 
     .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); 

    StepConfig installHive = new StepConfig() 
     .withName("Install Hive") 
     .withActionOnFailure(TERMINATE_JOB_FLOW) 
     .withHadoopJarStep(stepFactory.newInstallHiveStep()); 

    StepConfig runScript = new StepConfig() 
     .withName("Run Script") 
     .withActionOnFailure(CONTINUE) 
     .withHadoopJarStep(stepFactory.newRunHiveScriptStep("s3://dummy/dummy.hive")); 

    List<String> jarArgs = Arrays.asList(inputFilePath, outputDirectory); 
    HadoopJarStepConfig jarCfg= new HadoopJarStepConfig() 
     .withJar(jarPath) 
     .withArgs(jarArgs); 
    StepConfig runJar = new StepConfig() 
     .withName(jarName) 
     .withActionOnFailure(TERMINATE_JOB_FLOW) 
     .withHadoopJarStep(jarCfg); 

    JobFlowInstancesConfig instanceCfg = new JobFlowInstancesConfig() 
     .withKeepJobFlowAliveWhenNoSteps(false) 
     .withTerminationProtected(true) 
     .withInstanceCount(3) 
     .withMasterInstanceType(InstanceType.C1Medium.toString()) 
     .withSlaveInstanceType(InstanceType.C1Medium.toString()) 
     .withHadoopVersion("2.4.0"); 

    List<StepConfig> steps = Arrays.asList(enableDebugging, installHive, runScript, runJar); 

    RunJobFlowRequest request = new RunJobFlowRequest() 
     .withName("My EMR Job Flow") 
     .withAmiVersion("3.3.2") 
     .withInstances(instanceCfg) 
     .withLogUri(logPath); 
     .withSteps(steps); 

    RunJobFlowResult result = emr.runJobFlow(request); 
    // saying DescribeJobFlows is deprecated 
    // DescribeJobFlowsResult jobFlowDescResult = emr.DescribeJobFlows(DescribeJobFlowsRequest describeJobFlowsRequest); 
    } 

} 

답변

4

은 모니터 클러스터 상태가 작업 실행 진행 상황을 모니터링 할 수있는 다른 방법, 사용되지 않습니다.

RunJobFlowResult runJobResult = emr.runJobFlow(runJobFlowRequest); 
    System.out.printf("Run JobFlowId is: %s\n", runJobResult.getJobFlowId()); 

    while(true) { 
     DescribeClusterRequest desc = new DescribeClusterRequest() 
     .withClusterId(runJobResult.getJobFlowId()); 
     DescribeClusterResult clusterResult = emr.describeCluster(desc); 
     Cluster cluster = clusterResult.getCluster(); 
     String status = cluster.getStatus().getState(); 
     System.out.printf("Status: %s\n", status); 
     if(status.equals(ClusterState.TERMINATED.toString()) || status.equals(ClusterState.TERMINATED_WITH_ERRORS.toString())) { 
     break; 
     } 
     try { 
     TimeUnit.SECONDS.sleep(30); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
     // maybe other handle 
    }