저는 Apache Hadoop, MapReduce 및 Cassandra를 사용하여 Cassandra 테이블에서 읽고 다른 Cassandra 테이블로 출력하는 MapReduce 작업을 실행하고 있습니다.hadoop의 복합 기본 키를 사용하여 cassandra 테이블에 삽입하기
단일 기본 키가있는 테이블로 출력되는 작업이 몇 가지 있습니다. 예를 들어, 각 단어 유형의 수를 계산하기위한이 테이블에는 단일 키가 있습니다.
CREATE TABLE word_count(
word text,
count int,
PRIMARY KEY(text)
) WITH COMPACT STORAGE;
클래스를 줄이기 보이는 관련된이 같은 비트 :
public static class ReducerToCassandra
extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
내가 여분의 열을 추가하려면, 그럼 난 그냥 reduce
하여 List<Mutation>
이미 출력되는 또 다른 돌연변이를 추가해야하지만, 복합 기본 키의 새 열이있는 테이블로 출력하는 방법을 알아낼 수 없습니다. 예를 들어이 표는 위의 표와 동일하지만 단어를 발행 시간과 함께 색인화합니다.
CREATE TABLE word_count(
word text,
publication_hour bigint,
count int,
PRIMARY KEY(word, publication_hour)
) WITH COMPACT STORAGE;
나는 출력하려고처럼, 몇 가지 다른 방법을 시도했습니다 사용자 정의 WritableComparable
과 class
및 method
서명하고 이에 따라 job
구성을 갱신 (즉 단어와 시간을 모두 보유),하지만 그 reduce
던져합니다 a ClassCastException
WritableComparable
을 ByteBuffer
으로 캐스팅하려고 시도 할 때
적절한 열 이름을 Builder
과 함께 구축하려고 시도했습니다.
public static class ReducerToCassandra
// MappedKey MappedValue ReducedKey ReducedValues
extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
// MappedKey Values with the key wordHourPair
public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
long hour = wordHourPair.getHourLong();
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
//New Code
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>();
keyTypes.add(UTF8Type.instance);
keyTypes.add(LongType.instance);
CompositeType compositeKey = CompositeType.getInstance(keyTypes);
Builder builder = new Builder(compositeKey);
builder.add(ByteBufferUtil.bytes(word.toString());
builder.add(ByteBufferUtil.bytes(hour));
ByteBuffer keyByteBuffer = builder.build();
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
하지만 그를 슬로우 IOException
java.io.IOException: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)
이 질문 : Cassandra CQL3 composite key not written by Hadoop reducer 내가 찾고 있어요 코드의 종류를 전시 보이지만 유형 HashMap, ByteBuffer
의 파라미터를 가지는 context.write
를 호출하고 아니에요 context.write
에 해당 매개 변수를 허용하는 방법을 알려주세요.
내 테이블에 원하는 데이터 (워드 시간 키, int 값)를 얻으려면 어떻게해야합니까?