2017-10-03 9 views
0

기본적으로 공유 메모리에서 데이터를 읽으려면 동일한 노드/executor에서 실행되는 여러 태스크가 필요합니다. 이를 위해 작업을 시작하기 전에 데이터를 메모리에로드하는 초기화 함수가 필요합니다. Spark이 Executor 시작을위한 후크를 제공한다면이 초기화 코드를이 콜백 함수에 넣을 수 있습니다.이 초기화 코드는이 시작이 완료된 후에 만 ​​실행됩니다.Spark에서 Executor Startup을위한 후크가 있습니까?

그래서 제 질문은 스파크가 그런 고리를 제공합니까? 그렇지 않은 경우 다른 방법을 사용하면 동일한 결과를 얻을 수 있습니까?

답변

0

여러 개의 작업 (예 : 하나의 앱 인스턴스, 하나의 스파크 작업)을 실행할 수 있도록 앱의 여러 인스턴스를 실행할 필요는 없습니다. 동일한 SparkSession 객체를 여러 스레드가 사용하여 Spark 작업을 병렬로 제출할 수 있습니다.

그래서 다음과 같이 작동 할 수 있습니다

  • 응용 프로그램이 시작되고 메모리에 공유 데이터를로드 할 초기화 함수를 실행합니다. 말해, SharedData 클래스 개체에.
  • SparkSession 각 스레드 (SparkSession, SharedData)에 액세스 할
  • 개체 각 스레드는 공유 SparkSession 및 SharedData 개체를 사용하여 점화 태스크를 생성
  • 쓰레드 풀이 생성되어 생성된다.
  • 사용 사례에 따라, 응용 프로그램은 다음 중 하나를 수행 : 모두 완료 할 작업에 대한
    • 대기를하고 새 요청이 도착하기를
    • 대기 루프에서 세션 불꽃 닫고 새로운 불꽃을 생성 필요에 따라 스레드 풀에서 스레드를 사용하여 작업. 사용 동시에 취소 할 수 있습니다 setJobGroup 때문에 관련 작업을 사용하여 작업에 그룹을 setJobDescription를 사용하여 작업 설명을 할당하거나 할당 같은 당 스레드 일을 할 때

SparkContext (sparkSession.sparkContext는) 유용 cancelJobGroup. 동일한 풀을 사용하는 작업에 대한 우선 순위를 조정할 수도 있습니다 (자세한 내용은 https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application 참조). 당신이 드라이버 응용 프로그램에서 한 번에 데이터를로드 어디 스파크를 직렬화하고 집행 (한 번)에 각각 전송 - "공유 데이터"에 대한

+1

이것은 모두 사실이지만 "SharedData"가 직렬화 가능하지 않고 직렬화 가능하지만 크다면 너무 효율적이지는 않습니다.드라이버 응용 프로그램에서 직접 작성한 "SharedData"를 Spark 변환으로 사용하면 직렬화되어 작업 ** 당 ** 집행자에게 전송됩니다. –

+0

@TzachZohar 공유 데이터에 대한 좋은 지적은 작업 당 실행자에게 보냈습니다. 예, SharedData에 대한 브로드 캐스트 변수를 사용하면이 문제를 방지하는 데 도움이됩니다. 그러나 직렬화 요구 사항은 클로저 변수와 브로드 캐스트 변수 모두에 적용됩니다. –

+1

예, 직렬화 가능성 요구 사항이 방송에도 적용됩니다. 하지만 "정적"초기화 옵션에 대해서도 언급하지 않았습니다. (필자가 올바르게 읽는다면) OP가 목표로하는 것입니다. –

0

스파크의 솔루션은 방송을 사용하고 있습니다. 작업에서 해당 데이터를 사용하는 경우 Spark는 작업이 실행되기 전에 해당 데이터가 있는지 확인합니다. 예를 들면 : 당신이 드라이버 메모리에 데이터를 읽고 집행까지를 보내지 않도록하려면

object MySparkTransformation { 

    def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = { 
    val mySharedData: Map[String, Int] = loadDataOnce() 
    val broadcast = sc.broadcast(mySharedData) 
    rdd.map(r => broadcast.value(r)) 
    } 
} 

또는, 당신은 한 번씩 을 채워됩니다 값을 생성하기 위해 스칼라 objectlazy 값을 사용할 수 있습니다 JVM, 이는 Spark의 경우 집행자 당 한 번입니다. 예를 들어 : 실제로

// must be an object, otherwise will be serialized and sent from driver 
object MySharedResource { 
    lazy val mySharedData: Map[String, Int] = loadDataOnce() 
} 

// If you use mySharedData in a Spark transformation, 
// the "local" copy in each executor will be used: 
object MySparkTransformation { 
    def transform(rdd: RDD[String]): RDD[Int] = { 
    // Spark won't include MySharedResource.mySharedData in the 
    // serialized task sent from driver, since it's "static" 
    rdd.map(r => MySharedResource.mySharedData(r)) 
    } 
} 

, 각 집행에 mySharedData의 복사본 하나를해야합니다.

+0

예, 이미 브로드 캐스트 기능을 알고 있지만 사용하고 싶지 않은 이유는 내 작업이 C 프로그램을 통해 컴파일 된 코드 인 실행 파일을 실행하기 때문입니다. 그 데이터를 HDFS 파일에서 곧바로로드하고 데이터를 공유 메모리에 저장하여 해당 작업에서 사용할 수 있도록하고 싶습니다. 물론 나는 C 코드를 조금 수정해야 할 것이다. 게으른 발은이 목적에 더 적합 해 보입니다. 그래서, 나는 그것을 조사 할 것이다. – pythonic