2012-04-13 1 views
2

Java-Client 'HECTOR'를 사용하여 Cassandra에 저장된 데이터에 간단한 map-reduce 작업을 실행하려고했습니다.Hector를 사용하여 Cassandra 데이터에서 mapreduce 실행

나는이 아름다운 blogpost에서 설명한 hadoop-wordcount 예제를 성공적으로 실행했습니다. 나는 또한 Hadoop Support 기사를 읽었습니다.

하지만 내가하고 싶은 것은 구현면에서 조금 다릅니다 (wordcount 예제는 mapreduce-site.xml이 언급 된 스크립트를 사용합니다). 누군가가 카산드라 데이터의 'HECTOR'에서 로컬이 아닌 분산 모드로지도 축소 작업을 실행하는 방법을 이해할 수 있도록 도와 드리겠습니다.

내 코드는 로컬 모드에서 map-reduce 작업을 성공적으로 실행합니다. 하지만 내가 원하는 것은 분산 모드에서 실행하고 결과를 cassandra 키 공간의 새로운 ColumnFamily로 작성하는 것입니다.

내가 분산 모드로 실행하기위한
$PATH_TO_HADOOP/conf/mapred-site.xml
(위에서 언급 한 블로그 게시물에서 언급)이 곳을 설정해야 할 수도 있습니다,하지만 난 어디 모르겠어요. [경고] 내가 얻을 수있다

여기에 내 코드

public class test_forum implements Tool { 

private String KEYSPACE = "test_forum"; 
private String COLUMN_FAMILY ="posts"; 
private String OUTPUT_COLUMN_FAMILY = "output_post_count"; 
private static String CONF_COLUMN_NAME = "text"; 


public int run(String[] strings) throws Exception { 

    Configuration conf = new Configuration(); 

    conf.set(CONF_COLUMN_NAME, "text"); 
    Job job = new Job(conf,"test_forum"); 

    job.setJarByClass(test_forum.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(ReducerToCassandra.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 

    job.setOutputKeyClass(ByteBuffer.class); 
    job.setOutputValueClass(List.class); 

    job.setOutputFormatClass(ColumnFamilyOutputFormat.class); 
    job.setInputFormatClass(ColumnFamilyInputFormat.class); 


    System.out.println("Job Set"); 


    ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); 
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); 
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); 

    ConfigHelper.setInputColumnFamily(job.getConfiguration(),KEYSPACE,COLUMN_FAMILY); 
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); 

    SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text"))); 

    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate); 

    System.out.println("running job now.."); 

    boolean success = job.waitForCompletion(true); 

    return success ? 0:1; //To change body of implemented methods use File | Settings | File Templates. 

} 



public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> 
{ 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 
    private ByteBuffer sourceColumn; 
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 
    throws IOException, InterruptedException 
    { 
     sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); 
    } 

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException 
    { 



     IColumn column = columns.get(sourceColumn); 

     if (column == null) { 
      return; 
     } 

     String value = ByteBufferUtil.string(column.value()); 
     System.out.println("read " + key + ":" + value + " from " + context.getInputSplit()); 

     StringTokenizer itr = new StringTokenizer(value); 

     while (itr.hasMoreTokens()) 
     { 
      word.set(itr.nextToken()); 
      context.write(word, one); 
     } 
    } 


} 

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> 
{ 
    private ByteBuffer outputKey; 

    public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
    { 
     int sum = 0; 

     byte[] keyBytes = word.getBytes(); 
     outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length)); 


     for (IntWritable val : values) 
      sum += val.get(); 

     System.out.println(word.toString()+" -> "+sum); 
     context.write(outputKey, Collections.singletonList(getMutation(word, sum))); 

    } 

    private static Mutation getMutation(Text word, int sum) 
    { 
     Column c = new Column(); 
     c.setName(Arrays.copyOf(word.getBytes(), word.getLength())); 
     c.setValue(ByteBufferUtil.bytes(String.valueOf(sum))); 
     c.setTimestamp(System.currentTimeMillis()); 

     Mutation m = new Mutation(); 
     m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); 
     m.column_or_supercolumn.setColumn(c); 
     System.out.println("Mutating"); 
     return m; 

    } 

} 




public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { 

    System.out.println("Working..!"); 

    int ret=ToolRunner.run(new Configuration(), new test_forum(), args); 

    System.out.println("Done..!"); 

    System.exit(ret); 

} 

} 여기

입니다 :

WARN - JobClient     - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
WARN - JobClient     - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 

그러나 코드가 성공적으로 작업을지도-감소 수행하는 실행,하지만 난 몰라 어디에서 데이터를 씁니까?

편집 : 출력용으로 cassandra에 columnFamily를 만들지 않았습니다. 그러므로 그것은 쓰는 것이 아니 었습니다. 이제 유일한 문제는 분산 모드에서 실행하는 방법입니다.

감사합니다.

답변

2

클래스로 jar 파일을 만들었습니까?

클러스터를 통해 작업 클래스를 전파 할 수 있으려면 Hadoop에 jar 파일이 있어야합니다. 그렇지 않은 경우 "작업 병 파일 세트 없음"오류와 왜 분산 모드로 실행할 수 없는지 설명합니다. "hadoop jar ..."명령으로 작업을 시작하고 jar 종속성을 추가하십시오 (적어도 apache-cassandra!). 직업을 제출할 때 카산드라 서버가 작동하고 리틀 포트를 청취해야합니다.

그런데 하둡과 카산드라는 헥터가 필요하지 않습니다. ColumnFamilyInputFormat (및 ColumnFamilyOutputFormat)은 자체적으로 카산드라 (Cassandra)에 데이터를 읽고 (쓰는) 방법을 명시하지 않습니다. 그렇기 때문에 RpcPort, InitialAdressPartionner (구성 했음)을 구성해야합니다.

마지막주의 사항 : ColumnFamilyOutputFormat은 출력 열 패밀리를 만들지 않습니다. 이미 존재해야합니다. 그렇지 않으면 작성시 오류가 발생합니다. 이 도움이

희망,

벤와