스파크의 솔루션은 방송을 사용하고 있습니다. 작업에서 해당 데이터를 사용하는 경우 Spark는 작업이 실행되기 전에 해당 데이터가 있는지 확인합니다. 예를 들면 : 당신이 드라이버 메모리에 데이터를 읽고 집행까지를 보내지 않도록하려면
object MySparkTransformation {
def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = {
val mySharedData: Map[String, Int] = loadDataOnce()
val broadcast = sc.broadcast(mySharedData)
rdd.map(r => broadcast.value(r))
}
}
또는, 당신은 한 번씩 을 채워됩니다 값을 생성하기 위해 스칼라 object
에 lazy
값을 사용할 수 있습니다 JVM, 이는 Spark의 경우 집행자 당 한 번입니다. 예를 들어 : 실제로
// must be an object, otherwise will be serialized and sent from driver
object MySharedResource {
lazy val mySharedData: Map[String, Int] = loadDataOnce()
}
// If you use mySharedData in a Spark transformation,
// the "local" copy in each executor will be used:
object MySparkTransformation {
def transform(rdd: RDD[String]): RDD[Int] = {
// Spark won't include MySharedResource.mySharedData in the
// serialized task sent from driver, since it's "static"
rdd.map(r => MySharedResource.mySharedData(r))
}
}
, 각 집행에 mySharedData
의 복사본 하나를해야합니다.
이것은 모두 사실이지만 "SharedData"가 직렬화 가능하지 않고 직렬화 가능하지만 크다면 너무 효율적이지는 않습니다.드라이버 응용 프로그램에서 직접 작성한 "SharedData"를 Spark 변환으로 사용하면 직렬화되어 작업 ** 당 ** 집행자에게 전송됩니다. –
@TzachZohar 공유 데이터에 대한 좋은 지적은 작업 당 실행자에게 보냈습니다. 예, SharedData에 대한 브로드 캐스트 변수를 사용하면이 문제를 방지하는 데 도움이됩니다. 그러나 직렬화 요구 사항은 클로저 변수와 브로드 캐스트 변수 모두에 적용됩니다. –
예, 직렬화 가능성 요구 사항이 방송에도 적용됩니다. 하지만 "정적"초기화 옵션에 대해서도 언급하지 않았습니다. (필자가 올바르게 읽는다면) OP가 목표로하는 것입니다. –