실 스파클 어플리케이션을 원사 클러스터에서 실행합니다. 내 코드에서 나는 내 데이터 세트에 파티션을 생성하기위한 큐의 수를 가능한 코어를 사용스파크 : 프로그래밍 방식으로 클러스터 코어 수를 얻으십시오.
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
내 질문 : 어떻게 프로그래밍 방식으로 큐의 수를 가능한 코어를 얻을 수 없습니다 구성에 따라 할 수 있습니까?
실 스파클 어플리케이션을 원사 클러스터에서 실행합니다. 내 코드에서 나는 내 데이터 세트에 파티션을 생성하기위한 큐의 수를 가능한 코어를 사용스파크 : 프로그래밍 방식으로 클러스터 코어 수를 얻으십시오.
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
내 질문 : 어떻게 프로그래밍 방식으로 큐의 수를 가능한 코어를 얻을 수 없습니다 구성에 따라 할 수 있습니까?
Spark에서 클러스터의 실행 프로그램 수와 코어 수를 모두 얻는 방법이 있습니다. 과거에 사용한 스칼라 유틸리티 코드는 다음과 같습니다. Java에 쉽게 적응할 수 있어야합니다.
근로자의 수는 집행을 뺀 또는 sc.getExecutorStorageStatus.length - 1
의 수 : 두 가지 핵심 아이디어가있다.
작업자 당 java.lang.Runtime.getRuntime.availableProcessors
을 실행하여 작업자 당 코어 수를 얻을 수 있습니다.
나머지 코드 스칼라 implicits를 사용 SparkContext
편의 수단을 추가 상투적이다. 1.x 년 전 코드를 작성 했으므로 SparkSession
을 사용하지 않습니다.
하나의 마지막 점 : 왜곡 된 데이터의 경우 성능을 향상시킬 수 있으므로 여러 코어로 병합하는 것이 좋습니다. 실제로 데이터 크기와 작업이 공유 클러스터에서 실행 중인지 여부에 따라 1.5 배에서 4 배 사이의 아무 곳이나 사용합니다.
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
sc. getExecutorStorageStatus.length - 1은 나에게 좋다. 고맙습니다 – Rougher
어떤 리소스 관리자를 사용하고 있습니까? yarn 또는 mesos –
나는 원사를 사용하고 있습니다. – Rougher
[yarn cluster API] (https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API)에서 필요한 대기열 매개 변수를 추출한 다음 병합에 사용합니다 –