2017-09-08 16 views
0

Mesos 클러스터에서 실행중인 Spark 2.1.1 작업이 있습니다. Spark UI는 32 명의 액티브 실행 프로그램을 보여 주며, RDD.getNumPartitions는 28 개의 파티션을 보여줍니다. 그러나 한 (임의의) 집행자 만 작업을하고 있으며 다른 모든 집주인은 완료로 표시되어 있습니다. executor 코드 (stdout)에 디버그 명령문을 추가하고 하나의 실행 프로그램 만이이를 보여줍니다. 전체 파이프 라인은 다음과 같이 구성됩니다. ID 목록 가져 오기 - 각 ID에 대한 JSON 데이터 다운로드 -> JSON 데이터 구문 분석 -> S3에 저장.spark에서 단일 실행 프로그램에서 여러 파티션으로 작업 실행 중

stage 1: val ids=session.sparkContext.textFile(path).repartition(28) -> RDD[String] 

//ids.getNumPartitions shows 28 
stage 2: val json=ids.mapPartitions { keys => 
    val urlBuilder ... 
    val buffer .... 
    keys map { key => 
    val url=urlBuilder.createUrl(id) //java.net.URL 
    val json=url.openStream() ... //download text to buffer, close stream 
    (id,json.toString) 
    } 
} -> RDD[Tuple2[String,String]] 

stage 3: val output = json flatMap { t => 
    val values = ... //parse JSON, get values from JSON or empty sequence if not found 
    values map { value => (t._1, value) } 
} -> RDD[Tuple2[String,String]] 

stage 4: output.saveAsTextFile("s3://...") 

이들은 스파크 이진위한 config (설정) 설정 : --driver 메모리 32g --conf spark.driver.cores = 4 --executor 메모리 4g --conf spark.cores.max = 128 - conf spark.executor.cores = 4

하나의 실행 프로그램에서만 실행되는 스테이지는 두 번째 실행 프로그램입니다. 1 단계에서 파티션 수 (repartition (28))를 명시 적으로 지정했습니다. 이전에 그런 행동을 본 사람이 있습니까? 감사합니다,

M

내가 다른 방법 (트래비스의 제안 참조) 가서 일 100 (1 단계 이후) 파티션의 수를 증가 솔루션, 마무리 작업 몇 분 만에. 하지만 부작용이 있습니다. 이제 S3에 100 개의 부분 파일이 있습니다.

+0

작업을 시작하는 데 사용하는 코드와 명령을 제공 할 수 있습니까? –

답변

0

"ids 목록 얻기"후에 .repartition() 단계가 진행되고 있는지 확인하십시오.

먼저 28 개의 파티션으로 빈 세트를 생성 한 다음 ID 목록을 단일 파티션으로 가져 오는 것 같습니다. 예제 코드를 제공 한 후

편집 :

는 각 작업 (즉, 몇 초 내에) 신속하게 완료되어 가능한가? 필자는 수천 개의 뛰어난 작업이 있어도 작업이 짧은 시간 내에 완료 될 때 실행 프로그램을 유휴 상태로 만들도록 일정을 세우지 않는 것을 보았습니다. 이 경우 각 작업을 조금 더 오래 걸릴 수 있도록 파티션이 더 적을 수도 있습니다. 때로는 작업 스케줄러를 트리거하여 실행 프로그램을 유휴 상태로 만들기 위해 더 많은 작업을 예약하기에 충분합니다.

+0

나는 다른 방법으로 가고 (1 단계 이후) 파티션 수를 100으로 늘 렸습니다. 효과가 있었고 작업은 몇 분 만에 완료되었습니다. 하지만 부작용이 있습니다. 이제 S3에 100 개의 부분 파일이 있습니다. – user7606438