2014-07-20 2 views
2

이것은 Spark와 함께 제공되는 예제 코드입니다. 여기에 코드를 복사했는데이 링크는 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala입니다.Spark Streaming Stateful 네트워크 단어 수

14/07/20 11:52:57 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] 
java.lang.NoSuchMethodError: java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView; 
    at org.apache.spark.streaming.scheduler.JobScheduler.getPendingTimes(JobScheduler.scala:114) 
    at org.apache.spark.streaming.Checkpoint.<init>(Checkpoint.scala:43) 
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Exception in thread "Thread-37" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) 
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215) 
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) 
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) 
    at akka.actor.ActorCell.terminate(ActorCell.scala:338) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
14/07/20 11:53:00 ERROR Executor: Exception in task ID 0 
java.lang.IllegalStateException: cannot create children while terminating or terminated 
    at akka.actor.dungeon.Children$class.makeChild(Children.scala:184) 
    at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) 
    at akka.actor.ActorCell.attachChild(ActorCell.scala:338) 
    at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) 
    at org.apache.spark.scheduler.Task.run(Task.scala:51) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 
14/07/20 11:53:06 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[spark-akka.actor.default-dispatcher-13,5,main] 
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = HeartBeat(BlockManagerId(<driver>, x-131-212-225-148.uofm-secure.wireless.umn.edu, 47668, 0))] 
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251) 
    at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:51) 
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:113) 
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(BlockManager.scala:158) 
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:790) 
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:158) 
    at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80) 
    at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241) 
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464) 
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281) 
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279) 
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630) 
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582) 
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582) 
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582) 
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596) 
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750) 
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753) 
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746) 
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746) 
    at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15) 
    at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746) 
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593) 
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) 
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) 
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#1887396223]] had already been terminated. 
    at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:236) 

*** : 나는 "빈/실행 - 예를 들어 org.apache.spark.examples.streaming.StatefulNetworkWordCount 로컬 호스트 9999"명령을 사용하여 프로그램을 실행하려고 할 때 그러나, 나는 다음과 같은 오류가 주어졌다 ************* CODE ********************

object StatefulNetworkWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") 
     System.exit(1) 
    } 

    StreamingExamples.setStreamingLogLevels() 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 

     val previousCount = state.getOrElse(0) 

     Some(currentCount + previousCount) 
    } 

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") 
    // Create the context with a 1 second batch size 
    val ssc = new StreamingContext(sparkConf, Seconds(1)) 
    ssc.checkpoint(".") 

    // Create a NetworkInputDStream on target ip:port and count the 
    // words in input stream of \n delimited test (eg. generated by 'nc') 
    val lines = ssc.socketTextStream(args(0), args(1).toInt) 
    val words = lines.flatMap(_.split(" ")) 
    val wordDstream = words.map(x => (x, 1)) 

    // Update the cumulative count using updateStateByKey 
    // This will give a Dstream made of state (which is the cumulative count of the words) 
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 
    stateDstream.print() 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

나는 그것이 때문입니다 궁금해 "ssc.checkpoint (". ")"명령을 실행하여 로컬 파일 시스템에서 검사 점을 설정하려고 시도했지만 파일이 hadoop과 호환되는 파일이 아닙니까? (파일은 검사 점을 설정하기 위해 hadoop과 호환되어야합니다.) 그렇다면 어떻게 해결할 수 있습니까? 감사!

+0

무엇 "명령 다음"

그것이^^ _ 도움이되기를 바랍니다 위의 코드를 가지고 있지 않은거야? –

+0

죄송합니다. 프로그램을 실행하면됩니다. – user2895478

+0

어떤 명령을 사용합니까? –

답변

2

JRE 런타임 릴리스가 1.7이나 1.8인데 비슷한 문제가 있습니다. 1.8에서 spark-source-code를 컴파일하지만 1.7을 사용하여 코드를 실행하면이 문제가 다시 발생합니다. 런타임 문제로 1.8 해결할 것입니다.

JDK 1.8, concurrenthashmap.java (라인 812) :

// views 
private transient KeySetView<K,V> keySet; 
private transient ValuesView<K,V> values; 
private transient EntrySetView<K,V> entrySet; 

JDK 1.7