6

여러 필드가 포함 된 데이터 프레임을 읽고 JSON 데이터를 두 열을 기준으로 다시 분할하고 Pandas로 변환하는 JSON 데이터가 있습니다.Pyspark 간단한 재 파티션 및 toPandas()가 600,000 개 이상의 행에서 완료되지 않습니다.

이 작업은 약간의 오류가있는 단지 600,000 행의 데이터에서 EMR을 사용하지 못합니다. 나는 또한 스파크 드라이버의 메모리 설정을 늘렸고 여전히 해상도를 보지 못했다.

conf = SparkConf().setAppName('myapp1') 
conf.set('spark.yarn.executor.memoryOverhead', 8192) 
conf.set('spark.executor.memory', 8192) 
conf.set('spark.driver.memory', 8192) 
sc = SparkContext(conf=conf) 

내가 얻을 오류는 다음과 같습니다 :

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 

를 코드는 잘 최대 작동 다음과 같이

enhDataDf = (
    sqlContext 
    .read.json(sys.argv[1]) 
    ) 

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
    ) 
enhDataDf = sqlContext.createDataFrame(enhDataDf) 
enhDataDf = (
    enhDataDf 
    .toJSON() 
    .saveAsTextFile(sys.argv[2]) 
    ) 

내 스파크 설정

은 다음과 같습니다 여기

내 pyspark 코드 ~ 600,000 개의 JSON 라인을 지원합니다. 그런 다음 계속 실패합니다.

무슨 일이 일어나고 있는지 그리고이 문제를 디버그/수정하는 방법에 대한 의견이 있으십니까?

답변

3

: 당신이 팔을 사용하는 경우, 예를 들어, 다음 집행 및/또는 드라이버 4를 사용 데이터이므로 레코드 수가 증가하면 드라이버가 실패하게됩니다.

귀하의 의견에 따르면 정확한 파이프 라인을 사용하십시오. 즉, 무대 전체가 쓸모없고 부정확하다는 의미입니다.당신은 당신이 할 수있는 열을 기준으로 데이터를 기록하려면

sqlContext.createDataFrame(enhDataDf) 

: 데이터를 수집하고 더 병렬 때 당신이 때 스파크 DataFrame을 다시 만들 유지됩니다

.repartition('column1', 'column2') 

만든 그 분할을 보장있다 직접 :

(sqlContext 
    .read.json(sys.argv[1]) 
    .repartition('column1', 'column2') 
    .write 
    .json(sys.argv[2])) 

RDD에 toPandas 및 전환 중간 스킵. 귀하의 의견에 따라

: toPandas는 다음 목적을 제공

경우는 항상 파이프 라인에 제한 요소를 유지하고있는 유일한 직접 솔루션은 스케일 업하는 드라이버 노드입니다.

  • 당신이 이미이없는 불꽃의 상단에 사용하는 알고리즘을 재 구현 : 당신은 수집 된 데이터에 적용되는 정확한 알고리즘에 따라 대체 옵션을 고려할 수 있습니다.
  • 더 나은 SciPy 스택 상호 운용성을 가진 대체 프레임 워크를 고려하십시오 (예 : Dask).
1

이 내 내가 응용 프로그램과 함께 메모리 문제를 나타냅니다 클러스터 모드에서 PySpark 작업을 lanching했다 Spark – Container exited with a non-zero exit code 143, 생각 나게.

먼저 어떤 기계가 고장 나고 있는지, 운전자 또는 집행자인지 확인하고, 더 나은 동작을 목표로 삼을 수 있도록 노력하십시오. 내가 읽은 것으로부터 집행자가되어야합니다.


난 당신이 이미 memoryOverhead 구성, 좋은를 설정 볼 수 있습니다. 이제 memory 구성에 집중하겠습니다.

... Spark (PySpark)로 Python을 실행하면 모든 코드가 힙에서 실행됩니다. 이러한 이유 때문에, "많이 사용하지 않는"메모리를 할당해야합니다 (총 메모리에서 사용하도록 허용 된 메모리를 잘라 버릴 수 있습니다. 즉, 사용 가능한 전체 메모리가 20G이고 12G를 요청할 경우 . 8 세대가 사용하는 내 파이썬 응용 프로그램에 남아있을 것입니다

을 그래서 감소 속성 시도, 예를 감소


다음 목표 :! #cores

감소를 그것도,

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
) 

.toPandas()를 수집 : 나는 문제가 코드의 다음 부분에서 오는 것으로 생각

spark.executor.cores      4 
spark.driver.cores       4 
+0

이것은 여전히 ​​도움이되지 않습니다. 동일한 오류 메시지로 계속 오류를 가져 오십시오. 나는 말 그대로 M4.2xlarge 인스턴스에서 32GB 메모리와 설정을 실행하고 있습니다. 매우 신나는 오류를 제공하고 장님을 피할 수 없게 만드는 것은 매우 성가시다. – Gopala

+0

흠 나는 심지어 당신에게서 upvote @ Gopala를 보지 않는다, 그래서 그것은 나의 대답이 나쁘다는 것을 의미한다, 나는 그것을 삭제해야한다? – gsamaras

+3

나는 대답이 나쁘지 않다고 생각한다. 통찰력과 유용한 링크가 있습니다. 단지 그것이 내 문제를 해결하지 못했고 나는 더 이상의 도움이 있는지를 기다리고있다. – Gopala