결함 허용 동작을 시뮬레이트하고 싶습니다. 나는 수시로 실패한 "딱딱한"기능을 썼다. 예를 들면 : 스파크는 결함 허용 능력을 가지고 있기 때문에Apache Spark에서 실패한 작업을 다시 제출하지 않는 이유는 무엇입니까?
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if (raise)
throw new RuntimeException("oh ;(")
"Ok"
}
, 나는 실패한 작업이 자동으로 다시 실행됩니다,하지만 내가 뭐하는 거지 다음 코드
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object Example {
def main(args:Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("shuffle example")
.setMaster("local[*]")
.set("spark.task.maxFailures", "4") // it is default value
val sc = new SparkContext(conf)
val l:RDD[String] = sc.parallelize(List("a", "b", "c"), 3)
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if (raise)
throw new Exception("oh ;(")
"Ok"
}
println (l.map(myMap).collect().mkString("\n")) // failed
sc.stop()
}
}
에서 발생되지 않은 것으로 예상 잘못된?
예, 작업이 임의로 실행되거나 실패했습니다. 나는 spark이 실패한 작업을 다시 제출 (재실행) 할 것으로 예상했다. –
스파크는 사용자가 던진 예외 때문에 내부 실패로 인해 작업을 다시 제출하지 않습니다. –
좋아, SparkException 및 RuntimeException 던져 봤지만 결과가 동일합니다. 실제로 결함 허용 동작을 시뮬레이트하고 싶습니다. 내가 어떻게 할 수 있니? –