2016-07-18 10 views
2

스파크와 Accumulo에서 읽기나는 Accumulo 테이블에 연결하는 스파크 쉘을 사용하려고 쉘

나는로드 스파크 및 라이브러리 I는 다음과 같이해야합니다

$ 빈/스파크 - 쉘 --jars /data/bigdata/installs/accumulo-1.7.2/lib/accumulo-fate.jar:/data/bigdata/installs/accumulo-1.7.2/lib/accumulo-core.jar:/data/bigdata/installs/accumulo -1.7.2/lib/accumulo-trace.jar : /data/bigdata/installs/accumulo-1.7.2/lib/htrace-core.jar : /data/bigdata/installs/accumulo-1.7.2/lib/libthrift .jar

껍질에 붙여 넣습니다.

import org.apache.hadoop.mapred.JobConf 
import org.apache.hadoop.conf.Configuration 
import org.apache.accumulo.core.client.mapred.{AbstractInputFormat, AccumuloInputFormat} 
import org.apache.accumulo.core.client.security.tokens.PasswordToken 

import org.apache.hadoop.conf.Configuration 
import org.apache.accumulo.core.security.Authorizations 
import org.apache.accumulo.core.client.ClientConfiguration 

import org.apache.spark.{SparkConf, SparkContext} 

import org.apache.accumulo.core.client.mapred.InputFormatBase 


val user  = "root" 
val tableName = "hd_history" 
val instanceName = "GISCIENCE" 
val zooKeepers = "localhost:2181" 
val token = new PasswordToken("***") 

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
//conf.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],classOf[org.apache.spark.api.java.JavaSparkContext])) 
val sc = new SparkContext(conf) 

val jobConf = new JobConf() // Create a job conf 

// Configure the job conf with accumulo properties 
AbstractInputFormat.setConnectorInfo(jobConf, user, token) 
AbstractInputFormat.setScanAuthorizations(jobConf, new Authorizations) 
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers) 
AbstractInputFormat.setZooKeeperInstance(jobConf, clientConfig) 
InputFormatBase.setInputTableName(jobConf, tableName) 
// Create an RDD using the jobConf 
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value] 
) 

내가 rdd2.count()

하려고 할 때이

그것은 내게 분명하지 않다
16/07/18 18:30:43 INFO spark.SparkContext: Starting job: count at <console>:38 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:38) with 1 output partitions 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at <console>:38) 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Parents of final stage: List() 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Missing parents: List() 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35), which has no missing parents 
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1776.0 B, free 148.9 KB) 
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1110.0 B, free 150.0 KB) 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:39461 (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35) 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
16/07/18 18:30:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2284 bytes) 
16/07/18 18:30:43 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 
16/07/18 18:30:43 INFO rdd.NewHadoopRDD: Input split: Range: (-inf,+inf) Locations: [localhost] Table: hd_history TableID: 8 InstanceName: GISCIENCE zooKeepers: localhost:2181 principal: root tokenSource: INLINE authenticationToken: [email protected]db28e3 authenticationTokenFile: null Authorizations: offlineScan: false mockInstance: false isolatedScan: false localIterators: false fetchColumns: [] iterators: [] logLevel: INFO 
16/07/18 18:30:43 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 2082 bytes result sent to driver 
16/07/18 18:30:43 ERROR scheduler.TaskResultGetter: Exception while getting task result 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) 
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) 
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) 
    at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: ResultStage 1 (count at <console>:38) failed in 0.029 s 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Job 1 failed: count at <console>:38, took 0.040014 s 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 2 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 1 
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1157) 
    ... 48 elided 

가 어떤 클래스 내가 kryo 등록해야합니까 수 (즉, 알아내는 방법, 참조 된 ID 13994에 속하는 클래스 및 이것이 실제로 문제가되는지 여부.

답변

0

스파크 - 쉘을 시작한 후에 이미 추가 된 스파크 컨텍스트를 만들었습니다. 내 문제를 해결 SC 스파크 상황에

sc.stop() 

를 호출.