2017-04-10 9 views
1

결함 허용 동작을 시뮬레이트하고 싶습니다. 나는 수시로 실패한 "딱딱한"기능을 썼다. 예를 들면 : 스파크는 결함 허용 능력을 가지고 있기 때문에Apache Spark에서 실패한 작업을 다시 제출하지 않는 이유는 무엇입니까?

def myMap(v: String) = { 
    // print task info and return "Ok" or throw exception 
    val context = TaskContext.get() 
    val r = scala.util.Random 
    val raise = r.nextBoolean() 
    println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise") 
    if (raise) 
    throw new RuntimeException("oh ;(") 
    "Ok" 
} 

, 나는 실패한 작업이 자동으로 다시 실행됩니다,하지만 내가 뭐하는 거지 다음 코드

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkConf, SparkContext, TaskContext} 

object Example { 

    def main(args:Array[String]): Unit = { 
    Logger.getLogger("org").setLevel(Level.WARN) 
    Logger.getLogger("akka").setLevel(Level.WARN) 

    val conf = new SparkConf() 
     .setAppName("shuffle example") 
     .setMaster("local[*]") 
     .set("spark.task.maxFailures", "4") // it is default value 

    val sc = new SparkContext(conf) 


    val l:RDD[String] = sc.parallelize(List("a", "b", "c"), 3) 

    def myMap(v: String) = { 
     // print task info and return "Ok" or throw exception 
     val context = TaskContext.get() 
     val r = scala.util.Random 
     val raise = r.nextBoolean() 
     println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise") 
     if (raise) 
     throw new Exception("oh ;(") 
     "Ok" 
    } 

    println (l.map(myMap).collect().mkString("\n")) // failed 

    sc.stop() 
    } 
} 

에서 발생되지 않은 것으로 예상 잘못된?

답변

0

실제로 스파크는 로컬 모드에서 내결함성을 지원하지 않습니다.

위 예제에서 standalone (또는 yarn) 클러스터의 일부 마스터로 설정하고 jar 파일을 만들고 spark-submit을 통해 실행하면 예상대로 작동합니다. 일부 작업은 실패하지만, 제출. 응용 프로그램에 단일 스 튼 (Scala의 객체)이 있으면 실패한 작업에서 자체 상태를 유지합니다.

1

임의 변수 "r"은 스파크 실행과 관련없이 임의로 true/false를 반환합니다.

당신은 초기화 할 때 :

val r = scala.util.Random 
val raise = r.nextBoolean() 

인상은 임의가 참 또는 거짓 생성 가져옵니다. 그래서 당신

if (raise) 
     throw new Exception("oh ;(") 
     "Ok" 

는 무작위로 작동합니다. 나는 당신이 성취하려고하는 것을 얻지 못합니다. 은 내가

--- map a in partition 0 in stage 0 raise = false 

인상 그래서 언젠가는 나에 언젠가는 실패 할 것 인 무작위로 생성 된 부울 값입니다 얻을 내가 때

--- map a in partition 0 in stage 0 raise = true 
--- map b in partition 1 in stage 0 raise = false 

그것을 다시 실행 다음과 같은 출력을 얻을.

+0

예, 작업이 임의로 실행되거나 실패했습니다. 나는 spark이 실패한 작업을 다시 제출 (재실행) 할 것으로 예상했다. –

+1

스파크는 사용자가 던진 예외 때문에 내부 실패로 인해 작업을 다시 제출하지 않습니다. –

+0

좋아, SparkException 및 RuntimeException 던져 봤지만 결과가 동일합니다. 실제로 결함 허용 동작을 시뮬레이트하고 싶습니다. 내가 어떻게 할 수 있니? –