2017-10-26 9 views
0

나는 wordcount와 비슷한 Spark Streaming 스크립트를 만들었습니다. 플러스는 모든 정보를 컬렉션 (addsRDD)에 저장하려고하지만 잠시 후 블록이 없어지기 때문에 예외가 시작된다는 것입니다. 이 누적 RDD를 메모리에 보관할 수 있습니까?스파크 스트리밍이 정보를 유지하지 않습니다.

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import scala.collection.mutable 
import org.apache.spark.rdd.RDD 

val ssc = new StreamingContext(sc, Seconds(5)) 
val lines = ssc.socketTextStream("localhost",9999) 
var addedRDD : RDD[String] = sc.emptyRDD[String] 

lines.foreachRDD( 
    rdd => { 
     addedRDD = addedRDD.union(rdd).cache() 
     addedRDD.collect().foreach(println) 
    } 
) 
val words = lines.flatMap(_.split(" ")) 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 

wordCounts.print 
ssc.start() 
ssc.awaitTermination() 

가 발생 예외는 (은 블록이 제거 된 것을 말한다) 다음이다 :

ERROR scheduler.JobScheduler: Error running job streaming job 1509014590000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed! 
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) 
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547) 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972) 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
scala.collection.Iterator$class.foreach(Iterator.scala:727) 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
scala.collection.AbstractTraversable.map(Traversable.scala:105) 
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) 
     at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:981) 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) 
     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940) 
     at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
     at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:47) 
     at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:44) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed! 
     at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) 
     at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
     at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
     at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
     at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
     at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
     at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
     at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
     at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47) 
     at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547) 
     at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972) 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) 
     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

답변

0

당신은 몇 가지 RDD 년대 첨가 한 후 메모리에 rdd 저장되기 때문에 위의 오류가 OOM(Out Of MemoryException)입니다 실행 프로그램 메모리가 부족한 메모리 Exception, 기본값으로 cache()이 메모리에 rdd를 저장합니다.

아래의 옵션을 선택하면 rdd를 DISK_ONLY에 저장하는 데 도움이됩니다. MEMORY_AND_DISK,MEMORY_AND_DISK_SER,MEMORY_ONLY,MEMORY_ONLY_SER 같은 몇 가지 옵션이 _2 접미사 각 옵션은 디스크 나 메모리와 Services에서의 사본에는이 직렬화가 디스크 나 메모리

import org.apache.spark.storage.StorageLevel 
addedRDD = addedRDD.union(rdd).persist(StorageLevel.DISK_ONLY) 
+0

안녕에 하나의 저장 공간을 줄이기 위해 의미 의미 예외는 불행하게도 메모리 부족하지 않다 . 나는 모든 스택 추적을 추가하여 내 질문을 편집했습니다. 실제로 코드를 편집해도 프로세스가 변경되지 않고 여전히 오류가 발생합니다. –

+0

** socketTextStream에서 BlockRDD [86]를 사용하려고 시도했습니다. : 38 블록이 제거 된 후! ** 나는 추측합니다. 문제가 스트리밍과 함께 여기에 타임 아웃을 늘리십시오'val ssc = new StreamingContext (sc, Seconds (5)) – Vignesh

+0

그리고이 질문을 당신과 똑같이보십시오 https://stackoverflow.com/questions/33077746/spark- rdd-block-removed-before-use이며 업데이트 된 섹션을 사용해 보는 것을 잊지 마십시오. – Vignesh