2014-07-17 5 views
0

hadoop을 사용하는 동안 MultipleInputs를 사용했습니다. 여러 입력기에 여러 개의 Mappers가 할당되어 있습니다. EMR에서도 지원되는지 여부를 알고 싶습니다.EMR의 다중 입력 및 다중 매퍼 클래스 (EMR에서 Hadoop의 MultipleInputs와 비슷한 항목이 있습니까?)

hadoop에서 나는 이것을 이렇게했습니다. 이들은 다른 파일에 대한 내 맵퍼입니다. 여기에서는 입력을 개별적으로 식별하고 감속기에서 별도의 작업을 수행해야하는 여러 입력에 대해 몇 가지 작업을 수행해야하기 때문에 이러한 작업이 필요합니다.

public static class Map1 extends Mapper<Object, Text, Text, Text> { 
Text out=new Text(); 

Text value1= new Text(); 
public void map(Object key,Text value,Context context) throws IOException,InterruptedException 
    { 
    try 
     { 
     String line= value.toString(); 
     Configuration conf=context.getConfiguration(); 
     Float CVsTime=conf.getFloat("CVstartTime",0); 
     String dimension=conf.get("CVdimension"); 
     String CVfilter=conf.get("CVfilters"); 
     Float CVeTime=conf.getFloat("CVendTime",0); 
     Float CVstartTime=CVsTime; 
     Float CVendTime=CVeTime; 
     JSONParser parser = new JSONParser(); 
     Object obj=parser.parse(line); 
     JSONObject jsonObject=(JSONObject)obj; 
     Object datasttime=jsonObject.get("client_received_start_timestamp"); 
     String ddimension=""; 
     Object odimension=jsonObject.get(dimension); 
     if(odimension!=null) 
      ddimension=odimension.toString(); 
     String dst=datasttime.toString(); 
     dst=dst.substring(0,6)+"."+dst.substring(6,dst.length());     
     String metric=conf.get("CVmetric"); 
     Float tim=0.0f,/* sttime=0,endtime=0,*/CVval=0.0f; 
     tim=Float.parseFloat(dst.toString()); 
     Object met=jsonObject.get(metric); 
     CVval=Float.parseFloat(met.toString()); 
     int CVfiltercount = CVfilter.length() - CVfilter.replace(" ", "").length(); 

     String CVfilters[][]=new String[CVfiltercount][]; 
     StringTokenizer tokenizer=new StringTokenizer(CVfilter); 
     int k=0; 
     while(tokenizer.hasMoreTokens()) 
      { 
      String temptoken=tokenizer.nextToken(); 
      if(temptoken.indexOf("=")!=-1) 
       { 
       CVfilters[k]=temptoken.split("="); 
       CVfilters[k][1]=CVfilters[k][1].replace("\"",""); 
       k++; 
       } 
      } 
     int count=k; 
     int flag=0; 
     for(int i=0;i<k;i++) 
      { 
      Object filter=jsonObject.get(CVfilters[i][0]); 
      if(filter==null) 
       { 
       flag=1; 
       break; 
       } 
      if(!filter.toString().equals(CVfilters[i][1])) 
       { 
       flag=1; 
       break; 
       } 
      } 
     if((odimension!=null)&&(CVstartTime<=tim)&&(CVendTime>=tim)&&(flag==0)) 
      { 
      value1.set("key1"+" "+tim.toString()+" "+CVval.toString()); 
      out.set(ddimension); 
      context.write(out,value1); 
      } 
     flag=0; 
     } 
    catch(Exception e) 
     { 
     e.printStackTrace(); 
     } 
    } 
} 
public static class Map2 extends Mapper<Object, Text, Text, Text> 
{ 
    Text out = new Text(); 
    Text value2= new Text(); 
public void map(Object key,Text value,Context context) throws IOException,InterruptedException 
    { 
    try 
     { 
     Configuration conf=context.getConfiguration(); 
     Float CTVstartTime=conf.getFloat("CTVstartTime",0); 
     Float CTVendTime=conf.getFloat("CTVendTime",0); 
     String CTVfilter=conf.get("CTVfilters"); 
     String dimension=conf.get("CTVdimension"); 
     String line= value.toString(); 
     JSONParser parser = new JSONParser(); 
     Object obj=parser.parse(line); 
     JSONObject jsonObject=(JSONObject)obj; 
     Object datasttime=jsonObject.get("client_received_start_timestamp"); 
     Object odimension=jsonObject.get(dimension); 
     String ddimension=""; 
     if(odimension!=null) 
      ddimension=odimension.toString(); 
     String dst=datasttime.toString(); 
     dst=dst.substring(0,6)+"."+dst.substring(6,dst.length());     
     String metric=conf.get("CTVmetric"); 
     Float tim=0.0f,/*sttime=0,endtime=0,*/ctvvalue=0.0f;   
     StringTokenizer st=new StringTokenizer(line); 
     tim=Float.parseFloat(dst.toString()); 
     Object met=jsonObject.get(metric); 
     ctvvalue=Float.parseFloat(met.toString()); 
     int CTVfiltercount = CTVfilter.length() - CTVfilter.replace(" ", "").length(); 
     StringTokenizer tokenizer=new StringTokenizer(CTVfilter); 
     String CTVfilters[][]=new String[CTVfiltercount][]; 
     int k=0; 
     while(tokenizer.hasMoreTokens()) 
      { 
      String temptoken=tokenizer.nextToken(); 
      if(temptoken.indexOf("=")!=-1) 
       { 
       CTVfilters[k]=temptoken.split("="); 
       CTVfilters[k][1]=CTVfilters[k][1].replace("\"","");     
       k++; 
       } 
      } 
     int count=k; 
     int flag=0; 
     for(int i=0;i<k;i++) 
      { 
      Object filter=jsonObject.get(CTVfilters[i][0]); 
      if(filter==null) 
       { 
       flag=1; 
       break; 
       } 
      if(!filter.toString().equals(CTVfilters[i][1])) 
       flag=1; 

      } 
     if((odimension!=null)&&(CTVstartTime<=tim)&&(CTVendTime>=tim)&&(flag==0)) 
      { 
      value2.set("key2"+" "+tim.toString()+" "+ctvvalue.toString()); 
      out.set(ddimension); 
      context.write(out,value2); 
      } 
     } 
    catch(Exception e) 
     { 
     e.printStackTrace(); 
     } 
    } 
} 

내 주요 부분에서 hadoop에서 MultipleInputs를 사용했습니다. 여기에 내가 두 개의 서로 다른 입력 경로를 복용하고 위의 정의 그것은 완벽하게 작동하기 때문에 그들에게 다른 매퍼를 할당하고 여기에

job.setJobName("alert"); 
String MapPath1[]=args[1].split(","); 
String MapPath2[]; 
MapPath2 = type.equals("comparative") ? args[2].split(",") : null; 

Path outputPath; 
if (MapPath2!=null) 
    outputPath = new Path(args[3]); 
else 
    outputPath = new Path(args[2]); 
job.setMapperClass(Map1.class); 
if(type.equals("comparative")) 
    job.setMapperClass(Map2.class); 
job.setReducerClass(Reduce.class); 
job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
job.setOutputKeyClass(NullWritable.class); 
job.setOutputValueClass(Text.class); 
for(int i=0;i<MapPath1.length;i++) 
    MultipleInputs.addInputPath(job,new Path(MapPath1[i]),TextInputFormat.class,Map1.class); 
if(type.equals("comparative")) 
    for(int i=0;i<MapPath2.length;i++) 
    MultipleInputs.addInputPath(job,new Path(MapPath2[i]),TextInputFormat.class,Map2.class); 
FileOutputFormat.setOutputPath(job, outputPath); 

Map1.class 및 Map2.class

즉 서로 다른 입력에 대해 별도의 매퍼 클래스를 설정했습니다. 나는 EMR에서 같은 것이 가능한지 알아 내라. 나는 EMR에 관해서는 아무것도하지 않았다. 나는 인터넷 검색을 시도했지만 아무 것도 찾을 수 없었다. EMR이나 해결 방법에 해당하는 것이 있는지 알고 싶습니다. 나는 (path filePath = ((FileSplit) context.getInputSplit()). getPath();)을 사용하지 않으려는 경우를 제외하고 현재 입력 경로를 찾고자 할 때 데이터 나 파일의 청크 그것은에 속한다.

도움이 되셨습니까?

+0

당신은 내가 알고 싶습니다 비슷한 것을 알고있는 경우에 답변 해주세요. 나는 +1을 줄 것이다. 최근에 내 질문에 대한 답변이 없습니다. –

+0

시도해보고 결과가 보이는지 확인하십시오. –

답변

1

물론 지원됩니다. EMR은 Hadoop을 실행하는 곳입니다. 귀하의 질문은 "랩탑과 데스크탑 모두에서 웹 브라우저를 사용할 수 있습니까?"라고 말하는 것과 같습니다. 그게 네 질문에서 이해할 수있는거야.

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-hadoop-differences.html

+1

OK. 나는 EMR을 시도한 적이 없으며 그것에 대한 단서도 없다. 그러나 나는 알아 내도록 요청 받았다. 귀하의 답변을 뒷받침하는 것으로 확인되었습니다. http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-hadoop-differences.html –