2016-08-17 3 views
0

Hbase에있는 두 테이블의 mapside 조인을 시도하고 있습니다. 내 목표는 hashmap에 작은 테이블의 기록을 유지하고 큰 테이블과 비교하고, 일단 hbase의 테이블에 레코드를 기록합니다. 매퍼 (Mapper)와 감속기 (Reducer)를 사용하여 조인 (join) 연산을위한 비슷한 코드를 작성했는데, 그것은 잘 작동했으며 두 테이블 모두 매퍼 (mapper) 클래스에서 스캔되었습니다. 그러나 측면 결합 감소는 효율적이지 않으므로 매퍼 측에서만 테이블에 조인하려고합니다. 다음 코드에서 "블록 주석 처리"는 항상 false를 반환하고 첫 번째 테이블 (작은 테이블)은 읽지 않음을 확인하는 것입니다. 어떤 힌트도 도움이됩니다. HDP의 샌드 박스를 사용하고 있습니다.Hbase mapside join- 테이블 중 하나가 읽히지 않습니다? hbase 및 올바른 결과를 hbase에 읽습니다.

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
//import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper.Context; 
import org.apache.hadoop.util.Tool; 
import com.sun.tools.javac.util.Log; 
import java.io.IOException; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.*; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.TableMapper; 
import org.apache.hadoop.hbase.mapreduce.TableSplit; 

public class JoinDriver extends Configured implements Tool { 

    static int row_index = 0; 

     public static class JoinJobMapper extends TableMapper<ImmutableBytesWritable, Put> { 
     private static byte[] big_table_bytarr = Bytes.toBytes("big_table"); 
     private static byte[] small_table_bytarr = Bytes.toBytes("small_table"); 

     HashMap<String,String> myHashMap = new HashMap<String, String>(); 

     byte[] c1_value; 
     byte[] c2_value; 

     String big_table; 
     String small_table; 

     String big_table_c1; 
     String big_table_c2; 

     String small_table_c1; 
     String small_table_c2; 

     Text mapperKeyS; 
     Text mapperValueS; 
     Text mapperKeyB; 
     Text mapperValueB; 

     public void map(ImmutableBytesWritable rowKey, Result columns, Context context) { 
      TableSplit currentSplit = (TableSplit) context.getInputSplit(); 
      byte[] tableName = currentSplit.getTableName(); 

      try { 
       Put put = new Put(Bytes.toBytes(++row_index)); 


       // put small table into hashmap - myhashMap 
       if (Arrays.equals(tableName, small_table_bytarr)) { 

        c1_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c1")); 
        c2_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c2")); 
        small_table_c1 = new String(c1_value); 
        small_table_c2 = new String(c2_value); 

        mapperKeyS = new Text(small_table_c1); 
        mapperValueS = new Text(small_table_c2); 

        myHashMap.put(small_table_c1,small_table_c2); 


       } else if (Arrays.equals(tableName, big_table_bytarr)) { 
        c1_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c1")); 
        c2_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c2")); 
        big_table_c1 = new String(c1_value); 
        big_table_c2 = new String(c2_value); 

        mapperKeyB = new Text(big_table_c1); 
        mapperValueB = new Text(big_table_c2); 



      // if (set.containsKey(big_table_c1)){ 

        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c1"), Bytes.toBytes(big_table_c1)); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 
        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c2"), Bytes.toBytes(big_table_c2)); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 
        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c3"),Bytes.toBytes((myHashMap.get(big_table_c1)))); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 

      //  } 

       } 

      } catch (Exception e) { 
       // TODO : exception handling logic 
       e.printStackTrace(); 
      } 
     } 

    } 

    public int run(String[] args) throws Exception { 

     List<Scan> scans = new ArrayList<Scan>(); 



     Scan scan1 = new Scan(); 
     scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("small_table")); 
     System.out.println(scan1.getAttribute("scan.attributes.table.name")); 
     scans.add(scan1); 

     Scan scan2 = new Scan(); 
     scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("big_table")); 
     System.out.println(scan2.getAttribute("scan.attributes.table.name")); 
     scans.add(scan2); 

     Configuration conf = new Configuration(); 
     Job job = new Job(conf); 
     job.setJar("MSJJ.jar"); 
     job.setJarByClass(JoinDriver.class); 

     TableMapReduceUtil.initTableMapperJob(scans, JoinJobMapper.class, ImmutableBytesWritable.class, Put.class, job); 
     TableMapReduceUtil.initTableReducerJob("joined_table", null, job); 
     job.setNumReduceTasks(0); 


     job.waitForCompletion(true); 

     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     JoinDriver runJob = new JoinDriver(); 
     runJob.run(args); 

    } 

} 

답변

1

문제점 진술을 읽으면 다중 HBase 테이블 입력의 사용에 대해 잘못된 생각을 가지고 있다고 생각합니다. mapper 클래스의 setup 메소드에서 HashMap에 작은 테이블을로드하는 것이 좋습니다. 그런 다음 큰 테이블에서 맵 전용 작업을 사용하십시오. 맵 메소드에서 이전에로드 한 HashMap에서 해당 값을 페치 할 수 있습니다. 어떻게 해결되는지 알려주세요.