Java-Client 'HECTOR'를 사용하여 Cassandra에 저장된 데이터에 간단한 map-reduce 작업을 실행하려고했습니다.Hector를 사용하여 Cassandra 데이터에서 mapreduce 실행
나는이 아름다운 blogpost에서 설명한 hadoop-wordcount 예제를 성공적으로 실행했습니다. 나는 또한 Hadoop Support 기사를 읽었습니다.
하지만 내가하고 싶은 것은 구현면에서 조금 다릅니다 (wordcount 예제는 mapreduce-site.xml이 언급 된 스크립트를 사용합니다). 누군가가 카산드라 데이터의 'HECTOR'에서 로컬이 아닌 분산 모드로지도 축소 작업을 실행하는 방법을 이해할 수 있도록 도와 드리겠습니다.
내 코드는 로컬 모드에서 map-reduce 작업을 성공적으로 실행합니다. 하지만 내가 원하는 것은 분산 모드에서 실행하고 결과를 cassandra 키 공간의 새로운 ColumnFamily로 작성하는 것입니다.
내가 분산 모드로 실행하기위한
$PATH_TO_HADOOP/conf/mapred-site.xml
(위에서 언급 한 블로그 게시물에서 언급)이 곳을 설정해야 할 수도 있습니다,하지만 난 어디 모르겠어요. [경고] 내가 얻을 수있다
여기에 내 코드
public class test_forum implements Tool {
private String KEYSPACE = "test_forum";
private String COLUMN_FAMILY ="posts";
private String OUTPUT_COLUMN_FAMILY = "output_post_count";
private static String CONF_COLUMN_NAME = "text";
public int run(String[] strings) throws Exception {
Configuration conf = new Configuration();
conf.set(CONF_COLUMN_NAME, "text");
Job job = new Job(conf,"test_forum");
job.setJarByClass(test_forum.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(ReducerToCassandra.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
System.out.println("Job Set");
ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(),KEYSPACE,COLUMN_FAMILY);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate);
System.out.println("running job now..");
boolean success = job.waitForCompletion(true);
return success ? 0:1; //To change body of implemented methods use File | Settings | File Templates.
}
public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ByteBuffer sourceColumn;
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException
{
sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
{
IColumn column = columns.get(sourceColumn);
if (column == null) {
return;
}
String value = ByteBufferUtil.string(column.value());
System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
private ByteBuffer outputKey;
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
byte[] keyBytes = word.getBytes();
outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length));
for (IntWritable val : values)
sum += val.get();
System.out.println(word.toString()+" -> "+sum);
context.write(outputKey, Collections.singletonList(getMutation(word, sum)));
}
private static Mutation getMutation(Text word, int sum)
{
Column c = new Column();
c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
c.setTimestamp(System.currentTimeMillis());
Mutation m = new Mutation();
m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c);
System.out.println("Mutating");
return m;
}
}
public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException {
System.out.println("Working..!");
int ret=ToolRunner.run(new Configuration(), new test_forum(), args);
System.out.println("Done..!");
System.exit(ret);
}
} 여기
입니다 :
WARN - JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN - JobClient - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
그러나 코드가 성공적으로 작업을지도-감소 수행하는 실행,하지만 난 몰라 어디에서 데이터를 씁니까?
편집 : 출력용으로 cassandra에 columnFamily를 만들지 않았습니다. 그러므로 그것은 쓰는 것이 아니 었습니다. 이제 유일한 문제는 분산 모드에서 실행하는 방법입니다.
감사합니다.