1

CDH 5.8.3 클러스터에서 Spark 1.6.0을 사용하여 Spark Streaming 응용 프로그램을 작성하고 있습니다. 이 응용 프로그램은 매우 간단합니다 : Kafka에서 읽기만하면 DStream/RDD의 일부 변형이 만들어져 Hive 테이블로 출력됩니다. 나는 또한 sqlContext를 사용하는 어리석은 예제 코드를 넣으려고했지만 에러는 여전히 존재한다.Spark Streaming HiveContext NullPointerException

제 문제는 DStream의 foreachRDD 문에서 HiveContext를 사용할 수 없다는 것입니다.

내 코드는 다음과 같습니다

val sc = new SparkContext() 
val sqlContext = new HiveContext(sc) 
val ssc = new StreamingContext(sc, Minutes(sparkBatchInterval)) 
ssc.checkpoint(CHECKPOINT_DIR) 
ssc.sparkContext.setLogLevel("WARN") 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokersList, "auto.offset.reset" -> "smallest") 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic)) 
val validatedAndPersisted = dstream.transform(rdd => {...}).persist(StorageLevel.MEMORY_AND_DISK_SER) 
    val recordsToBeIngested = ... 
    recordsToBeIngested.foreachRDD(rdd=> { 
    rdd.persist(StorageLevel.MEMORY_AND_DISK) 

    val ingestCount = rdd.count 
    if(ingestCount>0) { 
    sqlContext.tables("sc4").show() //here actually I shoud have a insertInto 
    } 
} 

그리고 내가 오류이 하나입니다

Exception in thread "main" java.lang.NullPointerException 
    at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205) 
    at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554) 
    at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553) 
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:540) 
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:539) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:539) 
    at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:252) 
    at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:239) 
    at org.apache.spark.sql.hive.HiveContext$$anon$2.<init>(HiveContext.scala:459) 
    at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:459) 
    at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:458) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.<init>(HiveContext.scala:475) 
    at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:475) 
    at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:474) 
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.tables(SQLContext.scala:855) 
    at myPackage.Ingestion$$anonfun$createStreamingContext$1.apply(Ingestion.scala:173) 
    at myPackage.Ingestion$$anonfun$createStreamingContext$1.apply(Ingestion.scala:166) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

가이 오류의 이유 또는 얼마나 할 수있다에 대해 어떤 생각을 가지고 있습니까 고쳐?

나 자신에 의해 답을 발견 마르코

답변

1

을 주셔서 감사합니다. 이 문제는 StreamingContext 앞에 HiveContext를 작성했기 때문에 발생했습니다. StreamingContext를 생성 한 후 생성을 이동하면 문제가 해결됩니다.