2017-12-12 12 views
1

우리는 거대한 데이터 프로젝트에 대해 AWS Glue를 평가 중입니다. 일부 ETL이 있습니다. S3에서 CSV 파일을 올바르게 가져 오는 크롤러를 추가했습니다. 처음에는 해당 CSV를 JSON으로 변환하고 파일을 다른 S3 위치 (동일한 버킷, 다른 경로)에 놓기 만하면됩니다.AWS Glue의 간단한 ETL 작업에서 "파일이 이미 있습니다"라고 말합니다.

AWS에서 제공 한대로 스크립트를 사용했습니다 (여기에 맞춤 스크립트 없음). 그리고 모든 열을 매핑했습니다.

대상 폴더 (작업이 방금 생성 된) 비어 있지만 작업이 "파일이 이미 존재"와 함께 실패 snapshot here. S3 위치는 우리가 일을 시작하기 전에 출력이 이었다 드롭 척이었다. 그러나 오류가 발생한 후에는 두 개의 파일이 표시되지만 부분 파일 인 것처럼 보입니다. snapshot

어떤 일이 벌어지고 있는지에 대한 아이디어가 있습니까?

여기가 완전히 스택입니다 :

 
    Container: container_1513099821372_0007_01_000001 on ip-172-31-49-38.ec2.internal_8041 
    LogType:stdout 
    Log Upload Time:Tue Dec 12 19:12:04 +0000 2017 
    LogLength:8462 
    Log Contents: 
    Traceback (most recent call last): 
    File "script_2017-12-12-19-11-08.py", line 30, in 
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = 
    { 
     "path": "s3://primero-viz/output/tcw_entries" 
    } 
    , format = "json", transformation_ctx = "datasink2") 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 523, in from_options 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 175, in write_dynamic_frame_from_options 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 198, in write_from_options 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
    py4j.protocol.Py4JJavaError: An error occurred while calling o86.pyWriteDynamicFrame. 
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-63-141.ec2.internal, executor 1): java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000 
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:604) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793) 
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176) 
    at com.amazonaws.services.glue.hadoop.TapeOutputFormat.getRecordWriter(TapeOutputFormat.scala:65) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

    Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996) 
    at com.amazonaws.services.glue.HadoopDataSink$$anonfun$2.apply$mcV$sp(DataSink.scala:192) 
    at com.amazonaws.services.glue.HadoopDataSink.writeDynamicFrame(DataSink.scala:202) 
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:48) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
    Caused by: java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000 

답변

0

대상 폴더는 빈

이 존재하지 동일하지 않습니다 비어 있습니다. write_dynamic_frame 쓰기 모드를 지원하지 않으므로 먼저 디렉토리를 삭제해야 할 수도 있습니다.

+0

감사합니다! 나는 운도없이 그것을 시도했다. 또한 "s3 : // primero-viz/output/tcw_entries/run-1513105898742-part-r-00000"이라는 특정 파일 이름이 이미 있다는 오류 메시지가 표시됩니다. 그래서 폴더와 관련이 없다고 생각합니다 ... 어쨌든 두 번 확인합니다. 다시 한번 감사드립니다. – mbirnios

0

로드가 증분인지 "전체"인 경우 "덮어 쓰기"인지에 따라 쓰기 모드를 "추가"로 설정하십시오.

한 예는 수 :

events.toDF().write.json(events_dir, mode="append", partitionBy=["partition_0", "partition_1"])