0

AWS S3에있는 파루 파일을 읽으려고하고 있습니다. 오류.스파크 쪽 마루 s3 오류 : AmazonS3Exception : 상태 코드 : 403, AWS 서비스 : Amazon S3, AWS 요청 ID : xxxxx, AWS 오류 코드 : null

17/12/19 11:27:40 DEBUG DAGScheduler: ShuffleMapTask finished on 0 
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ResultStage 2) 
17/12/19 11:27:40 DEBUG DAGScheduler: missing: List(ShuffleMapStage 1) 
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ShuffleMapStage 1) 
17/12/19 11:27:40 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1, runningTasks: 2 
17/12/19 11:27:40 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 4, ip-xxx-xxx-xxx-xxx.ec2.internal): com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xxxxxxx, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: xxxxxxxx/xxxxxxxxx= 
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) 
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) 
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) 
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) 
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:688) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:71) 
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385) 
    at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157) 
    at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) 
    at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:180) 
    at org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ResultStage 2) 

스파크에서 DEBUG 로그를 활성화 한 후 S3에 스파크 작업을 제출하는 동안 이제는 NPE를 볼 수있었습니다.

17/12/19 11:27:39 INFO SparkContext: Starting job: count at TestRead.scala:54 
17/12/19 11:27:39 INFO DAGScheduler: Registering RDD 4 (count at TestRead.scala:54) 
17/12/19 11:27:39 INFO DAGScheduler: Got job 1 (count at TestRead.scala:54) with 1 output partitions 
17/12/19 11:27:39 INFO DAGScheduler: Final stage: ResultStage 2 (count at TestRead.scala:54) 
17/12/19 11:27:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 
17/12/19 11:27:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1) 
17/12/19 11:27:39 DEBUG DAGScheduler: submitStage(ResultStage 2) 
17/12/19 11:27:39 DEBUG DAGScheduler: missing: List(ShuffleMapStage 1) 
17/12/19 11:27:39 DEBUG DAGScheduler: submitStage(ShuffleMapStage 1) 
17/12/19 11:27:39 DEBUG DAGScheduler: missing: List() 
17/12/19 11:27:39 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[4] at count at TestRead.scala:54), which has no missing parents 
17/12/19 11:27:39 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 1) 
17/12/19 11:27:39 DEBUG ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Failed to use InputSplit#getLocationInfo. 
java.lang.NullPointerException 
    at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114) 
    at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:114) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at org.apache.spark.rdd.HadoopRDD$.convertSplitLocationInfo(HadoopRDD.scala:412) 
    at org.apache.spark.rdd.SqlNewHadoopRDD.getPreferredLocations(SqlNewHadoopRDD.scala:259) 
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553) 
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1519) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$14.apply(DAGScheduler.scala:969) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$14.apply(DAGScheduler.scala:969) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:969) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:924) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:923) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:923) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
17/12/19 11:27:39 DEBUG ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Failed to use InputSplit#getLocationInfo. 

왜이 오류가 발생하는지 잘 모르겠습니다. 여기 내 코드는 다음과 같습니다 -

val conf = new SparkConf(true).setAppName("TestRead") 
conf.set(SPARK_CASS_CONN_HOST, config.cassHost) 
conf.set("spark.cassandra.connection.timeout_ms","10000") 

val sc = new SparkContext(conf) 
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
sc.hadoopConfiguration.set("fs.s3a.access.key", config.s3AccessKey) 
sc.hadoopConfiguration.set("fs.s3a.secret.key", config.s3SecretKey) 

val sqlContext = new SQLContext(sc) 

val data = sqlContext.read.parquet(s"s3a://${config.s3bucket}/${config.s3folder}") 

println(s"data size is ${data.count()}") 
data.show() 
if(config.cassWriteToTable){ 
     println("Writing to Cassandra Table") 
     data.write.mode(SaveMode.Append).format("org.apache.spark.sql.cassandra").options(Map("table" -> config.cassTable, "keyspace" -> config.cassKeyspace)).save() 
} 

println("Stopping TestRead...") 
sc.stop() 

내 build.sbt 파일에 아래의 종속성을 포함했다 : - 여기에 문제가 될 수 무엇

"org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided,test" , 
    "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided", 
    "com.typesafe.play" % "play-json_2.10" % "2.4.6" excludeAll(ExclusionRule(organization = "com.fasterxml.jackson.core")), 
    "mysql" % "mysql-connector-java" % "5.1.39", 
    "com.amazonaws" % "aws-java-sdk-pom" % "1.11.7" exclude("commons-beanutils","commons-beanutils") exclude("commons-collections","commons-collections") excludeAll ExclusionRule(organization = "javax.servlet") 

?

답변

0

403/금지 : 귀하의 로그인 정보는 귀하가 읽으려는 파일에 대한 액세스 권한이 없습니다.

NPE와 관련하여 issues.apache.org의 spark에 대한 버그 보고서를 제출하십시오. 그들은 누가 책임을 지는지보기 위해 노력할 것입니다.

그래도 전에 : 최신 스파크 버전을 사용하고 있는지 확인하고 해당 NPE를 검색하십시오. 특히 고정되어있는 경우에는 복제물을 제출할 필요가 없습니다.