2017-02-14 8 views
2

현재 2 개의 테이블을 수집하기 위해 RDS에 연결하는 EMR 클러스터로 작업 중입니다.Amazon EMR Pyspark : rdd.distinct.count() failling

생성 된 2 개의 RDD가 매우 크지 만 다른 .take (x) 작업을 수행 할 수 있습니다.

unique_users = rdd.distinct.count() 

나는 시도했다 :

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

그러나 다음과 같은 작업을 수행 작동하지 않는 RDS에서 가져온 고유 한 사용자의 수를 계산하는 : 나는 또한 같은 더 복잡한 작업을 수행 할 수 있습니다

이전에 메모리 문제인지 여부를 확인하기위한 여러 구성 (문제가 해결되었지만 문제가 해결되지 않음) ...

다음은 현재 오류가 있습니다 :

Traceback (most recent call last): 
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module> 
run_server() 
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server 
AppServer().run() 
File "/home/hadoop/AppEngine/src/server.py", line 45, in run 
api = create_app(self.context, self.apps, self.devices) 
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app 
engine = AppEngine(spark_context, apps, devices) 
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__ 
self.unique_users = self.ratings.distinct().count() 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect 
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms 
Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.collect(RDD.scala:934) 
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
at py4j.Gateway.invoke(Gateway.java:280) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:214) 
at java.lang.Thread.run(Thread.java:745)` 
+0

I는 (실행 프로그램 (13)이 실행하는 작업 중 하나에 의해 야기 종료) 이유 예외'ExecutorLostFailure에서 메시지 참조있어 : 실행자 하트 비트 164,253 ms' 후 초과 – mrsrinivas

답변

1

이 솔루션은 다음이었다. 클러스터에서 사용하고있는 코어 인스턴스의 유형을 사용 가능한 메모리가 더 많은 인스턴스로 변경했습니다 (여기에서 m4.4xlarge).

--driver-memory 2G 
--executor-memory 50G 

또한 때문에 하트 비트 또는의 failling에서 긴 작업을 피하기 위해 이러한 매개 변수를 추가 할 수 있습니다

는 다음 나는 스파크 sumbmit 내 인스턴스의 메모리 할당을 강제로 정확한 매개 변수를했다 메모리 할당 :

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096) 
--conf spark.executor.heartbeatInterval=60s 
1

ExecutorLostFailure 이유 : 집행자 하트 비트는 164,253 MS

이 오류는 집행 인 165 초 후에 응답하지 않았고, (이 죽었다는 가정하에) 살해 된 것을 의미한다

후 시간 초과

실행기를 오랜 기간 동안 점유하고 실행해야하는 작업이있는 경우 spark-submit 명령 줄에서 다음 설정을 시도하면 하트 비트 제한 시간이 엄청나게 길어집니다 여기에 언급 된대로 : https://stackoverflow.com/a/37260231/5088142

어떤 방법을 방법이 문제를 조사하는 방법은 여기에서 찾을 수 있습니다 : https://stackoverflow.com/a/37272249/5088142


아래 질문에서 제기 된 몇 가지 문제를 명확히하기 위해 노력할 것입니다.

Spark Actions vs Transformations

스파크는 transformation을 수행 할 때 그것이 실행되지 않는 지연 계산, 즉 사용한다.

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

spark doc about transformation을 검토 당신은 모든 작업을 볼 수 있습니다 : 당신은 당신이 준 복잡한 작업의 예에서 action

을 수행 할 때 스파크는 아무 작업 (즉, 아무것도/계산 실행되지 않았다)이 없도록에만 실행 예 : map, groupByKeyjoin은 변환입니다.

따라서 실제로 이러한 명령을 실행 한 후에는 아무 것도 수행되지 않았습니다.

행동

생성 된 두 RDD의 차이는 매우 거대하지만 난 .take을 수행 할 수 있습니다 (x)는 그 작업 기타.이 제 X 요소를 리턴 후

take(x) 동작 간의 차이가

count

take(x) 동작은 종료한다. 그들은이 실행되지 않은 것처럼 -

count() 행동은 은 아무 의미가없는 실행 것으로 보인다 있었다 전체 RDD

당신이 (예에서와 같이) 일부 변환을 수행한다는 사실을 통과 한 후에 만 ​​종료됩니다.

실행중인 take(x) 동작은 RDD의 아주 작은 부분 만 사용하기 때문에 어떤 표시도 할 수 없습니다.

결론

당신이 사용하는 데이터의 크기를 지원하지 않는 시스템의 구성처럼 보인다, 또는 코드가 실행 프로그램을 장시간 중단 될 거대한 작업을 생성 (160 초). 나는이 작업을 수행하기에 충분한 메모리를 가지고 있지 않은

:

실제로 RDD에 실행 된 첫 번째 action

문제에 대한 countaction