2015-01-31 2 views
2

HBase를 처음 사용했습니다. HBase의 셀에 여러 버전을 저장하려고하지만 마지막으로 저장된 값만 가져옵니다.HBase 셀에 여러 버전 저장하기

: 0.0150 초 입력 파일은 다음과 같다에서

ROW     COLUMN+CELL            
abc     column=backward:first, timestamp=1422722312845, value=rrb 

1 행을 다음과 같이 두 가지 출력을 반환 get 'Dummy1','abc', {COLUMN=>'backward:first', VERSIONS=>12}scan 'Dummy1', {VERSIONS=>12} : 여러 개의 저장 버전을 검색하려면 다음 두 명령을 시도

abc xyz kkk 
abc qwe asd 
abc anf rrb 
는 는 HBase를 표 생성을위한 코드는 다음과 같이된다

:

import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.HColumnDescriptor; 
import org.apache.hadoop.hbase.HTableDescriptor; 
import org.apache.hadoop.hbase.client.HBaseAdmin; 

public class HBaseTableCreator { 

    public static void main(String[] args) throws Exception { 

     HBaseConfiguration conf = new HBaseConfiguration(); 
     conf.set("hbase.master","localhost:60000"); 

     HBaseAdmin hbase = new HBaseAdmin(conf); 
     HTableDescriptor desc = new HTableDescriptor("Dummy"); 
     HColumnDescriptor meta = new HColumnDescriptor("backward".getBytes()); 
     meta.setMaxVersions(Integer.MAX_VALUE); 
     HColumnDescriptor prefix = new HColumnDescriptor("forward".getBytes()); 
     prefix.setMaxVersions(Integer.MAX_VALUE); 
     desc.addFamily(meta); 
     desc.addFamily(prefix); 
     hbase.createTable(desc); 

} 

} 
,

HBase에서 데이터를 덤프하는 코드는 다음과 같습니다. 주 클래스 : import java.io.IOException;

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 


public class TestMain { 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
    { 
     // TODO Auto-generated method stub 
     Configuration conf=new Configuration(); 
     //HTable hTable = new HTable(conf, args[3]); 
     String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs(); 
     if(otherArgs.length!=2) 
     { 
      System.err.println("Usage: wordcount <in> <out>"); 
      System.exit(2); 
     } 
     Job job=new Job(conf,"HBase dummy dump"); 
     job.setJarByClass(TestMain.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     job.setMapperClass(TestMapper.class); 
     TableMapReduceUtil.initTableReducerJob("Dummy", null, job); 
     //job.setOutputKeyClass(NullWritable.class); 
     //job.setOutputValueClass(Text.class); 
     job.setNumReduceTasks(0); 
     //job.setOutputKeyClass(Text.class); 
     //job.setOutputValueClass(Text.class); 
     FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
     //HFileOutputFormat.configureIncrementalLoad(job, hTable); 
     System.exit(job.waitForCompletion(true)?0:1); 
    } 
} 

매퍼 클래스 :

import java.io.IOException; 

import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 

public class TestMapper extends Mapper <LongWritable, Text, Text, Put>{ 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

     String line=value.toString(); 
     String[] l=line.split("\\s+"); 
     for(int i=1;i<l.length;i++) 
     { 
      Put HPut = new Put(l[0].getBytes()); 
      HPut.add("backward".getBytes(),"first".getBytes(),l[i].getBytes()); 
      context.write(new Text(l[0]),HPut); 
     } 
    } 
} 

내가 잘못거야 곳 알려주세요.

답변

1

당신의 문제는 쓰기 작업이 자동으로 일괄 처리되고 작업이 끝날 때 (테이블이 닫힐 때) 플러시되어 모든 작업이 정확히 동일한 타임 스탬프를 갖게된다는 점입니다. 기본적으로 덮어 쓰기 (다른 타임 스탬프와 같은 타임 스탬프를 가진 버전을 쓰는 것은 새로운 버전을 삽입하는 대신 해당 버전을 덮어 씁니다).

문제를 해결하기위한 첫 번째 방법은 직접 Put HPut = new Put(l[0].getBytes(), System.currentTimeMillis());과 함께 타임 스탬프를 제공하는 것이지만 너무 빨라 많은 풋의 타임 스탬프가 동일하므로 아마 같은 문제에 직면하게 될 것입니다.

1 정지를 HBase를 테이블에 쓰기를 처리하는 사용자 정의 감속기에 찬성 TableMapReduceUtil.initTableReducerJob를 사용하여 :

이것은 내가 이것을 극복하기 위해 할 것 인 것이다. 어느 정도 일을 내 자신의 감속기를 구현 : (abc, xyz kkk qwe asd anf rrb 예)

3 그들이 감속기에 반복자로 분류하고 전달받을 수 있도록

2 컨텍스트에 각 행의 모든 ​​값을 기록 할 수있는 매퍼를 수정 이 의사 같은 :

Define myHTable 
setup() { 
    Instantiate myHtable 
    Disable myHtable autoflush to prevent puts from being automatically flushed 
    Set myHtable write buffer to at least 2MB 
} 
reduce(rowkey, results) { 
    baseTimestamp = current time in milliseconds 
    Iterate results { 
    Instantiate put with rowkey ++baseTimestamp 
    Add result to put 
    Send put to myHTable 
    } 
} 
cleanup() { 
    Flush commits for myHTable 
    Close myHTable 
} 

그런 식으로, 항상 각 버전 간의 1ms의가있을 것입니다, 당신이 조심있어 유일한은 당신이 버전의 거대한 숫자를 가지고 같은 작업을 실행하는 경우 여러 번, 새 작업의 타임 스탬프는 다음의 타임 스탬프와 겹칠 수 있습니다. 이전 버전의 경우 30k 미만 버전을 기대한다면 각 작업이 다음 작업에서 적어도 30 초 이상 떨어져 있기 때문에 걱정할 필요가 없습니다.

어쨌든, 백 버전 (http://hbase.apache.org/book.html#versions) 이상이 필요한 경우 버전이없는 키 접근 방식 (키 + 타임 스탬프가 포함 된 복합 행 키)을 사용하는 것이 현명합니다.

이상한 서식을 사용하여 죄송합니다. 의사 코드가 멋지게 표시되도록하는 유일한 방법이었습니다.

+0

테이블에 데이터를 넣으려면 HBase 셸을 사용하고 있습니다. 하지만 버전을 구할 수는 없습니다. 나는 최신 가치만을 얻는다. 버전 저장을 "켜기"로 전환 할 수있는 HBase 시스템 구성이나 테이블/열 특정 설정이 있습니까? –