2017-05-23 5 views
0

내 개발 환경은 centos7, hbase 1.2.5, happybase 1.1.0, python 2.7, PyCharm, hadoop 2.7.3, spark 2.1입니다. 큰 데이터 소프트웨어를 개발 중입니다. 값을 HBase 테이블에 넣어야합니다. Spark RDD의 값입니다. 다음은 코드입니다.happybase 통해 hbase 테이블에 값을 넣는 방법?

import happybase 
from pyspark import SparkContext, SparkConf 

connection = happybase.Connection('localhost') 
table = connection.table('tablename') 
conf = SparkConf().setAppName("myFirstSparkApp").setMaster("local") 
sc = SparkContext(conf=conf) 
distFile = sc.textFile("/inputFilePath/") 
newLines = distFile.filter(lambda x: 'filter":' in x) 
newLines = newLines.map(lambda line:line.split('"')) 
# The following line is working. Insert a row into the table. 
table.put(b'row-key0', {'billCode:': '222', 'trayCode:': '222', 'pipeline:': '333'}) 
# But the following line is not working. what is wrong? Why? 
newLines.foreach(lambda x: table.put(b'row-key', {'billCode:': x[7], 'trayCode:': x[3], 'pipeline:': x[11]})) 

마지막 줄 코드가 작동하지 않습니다. 오류 메시지가 없습니다 :

ImportError를 : pickle.PicklingError cybin라는 이름의 모듈 : ImportError를 : : 오브젝트를 직렬화하지 못했습니다

cybin라는 이름의 모듈 내가 스파크 + happybase + 파이썬의 새로운 개발자입니다. 그것을 해결하는 방법? 도움이 필요하면 제발. 고맙습니다.

+0

오류 메시지를 읽으십시오 - 귀하의 질문에 함께 표시되지 않습니다 – Drako

+0

이 행의 코드는 디버깅을 통해 작동하지 않습니다. newLines.foreach (lambda x : table.put (b'row-key ', {'billCode : ': x [7],'trayCode : ': x [3],'pipeline : ': x [11] }))) –

답변

0

다음은 간단한 예입니다.

import happybase 
from pyspark import SparkContext, SparkConf 
conf = SparkConf().setAppName("App").setMaster("local") 
sc = SparkContext(conf=conf) 
rdd = parallelize([("a","1"),("b","2")]) 
def func(x): 
    conn = happybase.Connection('localhost') 
    table = conn.table("table_name") 
    table.put(x[0],{"cf:c":x[1]}) 
    conn.close() 
rdd.foreach(func) 

그러나 완벽하지, 당신은 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd 행운을 참조 할 수 있습니다.