2016-11-23 9 views
5

나는 불꽃을 사용하는 것의 차이점이 무엇인지 궁금 해서요 mapPartitions 대 일시적인 게으른 val.
각 파티션은 기본적으로 다른 노드에서 실행되기 때문에 각 노드 (객체에있는 것으로 가정)에 일시적인 지연 val의 단일 인스턴스가 만들어집니다. 당신도 그것을 원하는 이유스파크 mapPartitions 대 일시적인 게으른 val

class NotSerializable(v: Int) { 
    def foo(a: Int) = ??? 
} 

object OnePerPartition { 
    @transient lazy val obj: NotSerializable = new NotSerializable(10) 
} 

object Test extends App{ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("test") 
    val sc = new SparkContext(conf) 

    val rdd: RDD[Int] = sc.parallelize(1 to 100000) 

    rdd.map(OnePerPartition.obj.foo) 

    // ---------- VS ---------- 

    rdd.mapPartitions(itr => { 
     val obj = new NotSerializable(10) 
     itr.map(obj.foo) 
    }) 
} 

하나는,
내가 어떤 일반적인 컬렉션을 구현 (RDD 내 논리를 실행하기위한 일반 컨테이너 개념을 생성하고 싶습니다 ... List을 요청할 수 있습니다 : 예를 들어

, scalding pipe 등)
모두 "지도"라는 개념이 있지만 mapPartitionspark에 고유합니다.

답변

2

우선 여기 transientlazy이 필요하지 않습니다. object 래퍼를 사용하여이 작업을하기에 충분하고 실제로으로 이것을 쓸 수 있습니다 : 객체 래퍼 mapPartitions 내부 NotSerializable를 초기화하는 사이에 근본적인 차이가 있습니다

object OnePerExecutor { 
    val obj: NotSerializable = new NotSerializable(10) 
} 

. 이것은 :

rdd.mapPartitions(iter => { 
    val ns = NotSerializable(1) 
    ??? 
}) 

은 파티션 당 하나의 NotSerializable 인스턴스를 만듭니다.

반면에 객체 래퍼는 각 실행 프로그램 JVM에 대해 단일 NotSerializable 인스턴스를 생성합니다. 결과적으로이 인스턴스는 다음과 같습니다.

  • 여러 파티션을 처리하는 데 사용할 수 있습니다.
  • 여러 실행 프로그램 스레드에서 동시에 액세스 할 수 있습니다.
  • 사용되는 함수 호출을 초과하는 수명이 있습니다.

이것은 스레드로부터 안전해야하며 모든 메소드 호출은 부작용이 없어야 함을 의미합니다.