2017-01-17 1 views
0

Spark에 익숙하지 않으며 Hadoop 시퀀스 파일에서 RDD를 만들려고합니다. 하지만 다음 오류가 발생합니다. 온라인 기사에서 검색했지만 제공된 솔루션으로 정렬하지 못했습니다. 그럼 아무도이 문제를 해결하도록 나를 도울 수 있습니까?Hadoop 시퀀스 파일에서 Spark RDD를 생성하는 것이 작동하지 않습니다.

내 치어 파일은 다음과 같이이다

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<parent> 
    <groupId>com.pearson.tellurium</groupId> 
    <artifactId>pw-analytics</artifactId> 
    <version>1.0-SNAPSHOT</version> 
</parent> 

<modelVersion>4.0.0</modelVersion> 
<artifactId>aggregation-engine</artifactId> 
<name>Pearson Writer Analytics - Aggregation Engine</name> 
<packaging>jar</packaging> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <spark.version>1.6.1</spark.version> 
    <json4s.version>3.3.0</json4s.version> 
    <scala.compat.version>2.10</scala.compat.version> 
    <configuration>test</configuration> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>${spark.version}</version> 
     <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-hive_2.10</artifactId> 
     <version>${spark.version}</version> 
     <!--<scope>provided</scope>--> 
     <exclusions> 
      <exclusion> 
       <artifactId>jackson-core-asl</artifactId> 
       <groupId>org.codehaus.jackson</groupId> 
      </exclusion> 
      <exclusion> 
       <artifactId>jackson-mapper-asl</artifactId> 
       <groupId>org.codehaus.jackson</groupId> 
      </exclusion> 
     </exclusions> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.2.4</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-aws</artifactId> 
     <version>2.6.4</version> 
    </dependency> 
    <dependency> 
     <groupId>org.json4s</groupId> 
     <artifactId>json4s-core_2.10</artifactId> 
     <version>${json4s.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.json4s</groupId> 
     <artifactId>json4s-native_2.10</artifactId> 
     <version>${json4s.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>mysql</groupId> 
     <artifactId>mysql-connector-java</artifactId> 
     <version>5.1.38</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scalatest</groupId> 
     <artifactId>scalatest_${scala.compat.version}</artifactId> 
     <version>2.2.4</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
    </dependency> 
    <dependency> 
     <groupId>org.mockito</groupId> 
     <artifactId>mockito-all</artifactId> 
     <version>2.0.2-beta</version> 
    </dependency> 
</dependencies> 

<build> 
    <filters> 
     <filter>src/main/filters/${configuration}.properties</filter> 

    </filters> 
    <finalName>pw-aggregation-engine</finalName> 

    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-assembly-plugin</artifactId> 
      <executions> 
       <execution> 
        <id>make-assembly</id> 
        <phase>package</phase> 
        <goals> 
         <goal>single</goal> 
        </goals> 
        <configuration> 
         <descriptorRefs> 
          <descriptorRef>jar-with-dependencies</descriptorRef> 
         </descriptorRefs> 
        </configuration> 
       </execution> 
       <execution> 
        <id>dist</id> 
        <phase>package</phase> 
        <goals> 
         <goal>single</goal> 
        </goals> 
        <configuration> 
         <descriptors> 
          <descriptor>src/main/assembly/bin.xml</descriptor> 
         </descriptors> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
     <plugin> 
      <groupId>org.scala-tools</groupId> 
      <artifactId>maven-scala-plugin</artifactId> 
      <executions> 
       <execution> 
        <id>compile</id> 
        <goals> 
         <goal>compile</goal> 
        </goals> 
        <phase>compile</phase> 
       </execution> 
       <execution> 
        <id>test-compile</id> 
        <goals> 
         <goal>testCompile</goal> 
        </goals> 
        <phase>test-compile</phase> 
       </execution> 
       <execution> 
        <phase>process-resources</phase> 
        <goals> 
         <goal>compile</goal> 
        </goals> 
       </execution> 
      </executions> 
     </plugin> 
     <plugin> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <configuration> 
       <source>1.5</source> 
       <target>1.5</target> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>net.alchim31.maven</groupId> 
      <artifactId>scala-maven-plugin</artifactId> 
     </plugin> 
     <plugin> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest-maven-plugin</artifactId> 
      <configuration> 
       <forkMode>once</forkMode> 
       <parallel>false</parallel> 
       <argLine>-Xmx512M -XX:PermSize=128M -XX:+CMSClassUnloadingEnabled -XX:+CMSClassUnloadingEnabled 
        -XX:MaxPermSize=128M -XX:ReservedCodeCacheSize=512M 
       </argLine> 
       <reportsDirectory>$outputDirectory/scalatest-reports</reportsDirectory> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>2.4.1</version> 
      <executions> 
       <execution> 
        <phase>package</phase> 
        <goals> 
         <goal>shade</goal> 
        </goals> 
        <configuration> 
         <minimizeJar>true</minimizeJar> 
         <createDependencyReducedPom>false</createDependencyReducedPom> 
         <artifactSet> 
          <excludes> 
           <exclude>junit:junit</exclude> 
           <exclude>org.aspectj</exclude> 
          </excludes> 
         </artifactSet> 
        </configuration> 
       </execution> 
      </executions> 
      <configuration> 
       <filters> 
        <filter> 
         <artifact>*:*</artifact> 
         <excludes> 
          <exclude>META-INF/*.SF</exclude> 
          <exclude>META-INF/*.DSA</exclude> 
          <exclude>META-INF/*.RSA</exclude> 
         </excludes> 
        </filter> 
       </filters> 
      </configuration> 
     </plugin> 
     <plugin> 
      <artifactId>maven-resources-plugin</artifactId> 
      <version>2.7</version> 
      <executions> 
       <execution> 
        <id>copy-resources</id> 
        <!-- here the phase you need --> 
        <phase>validate</phase> 
        <goals> 
         <goal>copy-resources</goal> 
        </goals> 
        <configuration> 
         <outputDirectory>${basedir}/target/</outputDirectory> 
         <resources> 
          <resource> 
           <directory>src/main/resources</directory> 
           <filtering>true</filtering> 

          </resource> 
         </resources> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
    </plugins> 
</build> 

내 SparkContext 만들기 내가 만든 시퀀스 파일 내 테스트/자원 디렉토리 및 I에 배치 한이

def initializeSpark(jobPrefix: String): SparkContext = { 
    LOG.info("Initializing Job...") 
    LOG.info("Job Name : " + jobPrefix + java.time.LocalDateTime.now.toString) 
    val sparkConf = new SparkConf().setAppName(jobPrefix + java.time.LocalDateTime.now.toString).setMaster("local[*]") 
    val sparkContext = new SparkContext(sparkConf) 
    sparkContext 
    } 

같다 그 위치에서 그것을 읽고있다.

def parseRDD(sparkContext: SparkContext): RDD[String] = { 
val filePath = new File("").getAbsolutePath + "/src/test/resources/1_0_00000000000000000218" 
val rdd = sparkContext.sequenceFile(filePath, classOf[LongWritable], classOf[BytesWritable]) 
    .map((hadoopFile: (LongWritable, BytesWritable)) => { 
    val bytes = hadoopFile._2.getBytes 
    (hadoopFile._1.get(), new String(bytes.slice(0, hadoopFile._2.getLength))) 
    }).map(_._2) 

def emptyStringRDD(): RDD[String] = { 
    sparkContext.parallelize(Seq()) 
} 

val validatedRDD = { 
    val r = Try(rdd.first) 
    if (!r.isFailure) { 
    rdd 
    } else { 
    LOG.debug("Returning Empty RDD: " + r.failed.get.getMessage) 
    emptyStringRDD() 
    } 
} 
validatedRDD 
} 

그래서 다음과 같은 오류가 발생합니다.

An exception or error caused a run to abort: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class; 
    java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class; 
at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<init>(ScalaNumberDeserializersModule.scala:49) 
at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<clinit>(ScalaNumberDeserializersModule.scala) 
at com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61) 
at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19) 
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35) 
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala) 
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:81) 
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala) 
at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) 
at org.apache.spark.SparkContext.sequenceFile(SparkContext.scala:1166) 
at com.pearson.tellurium.analytics.aggregation.MockRDDCreator$.parseRDD(MockRDDCreator.scala:25) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext$$anonfun$3.apply$mcV$sp(TestSparkContext.scala:41) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext$$anonfun$3.apply(TestSparkContext.scala:39) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext$$anonfun$3.apply(TestSparkContext.scala:39) 
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) 
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) 
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
at org.scalatest.Transformer.apply(Transformer.scala:22) 
at org.scalatest.Transformer.apply(Transformer.scala:20) 
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) 
at org.scalatest.Suite$class.withFixture(Suite.scala:1122) 
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) 
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) 
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) 
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) 
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) 
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext.org$scalatest$BeforeAndAfter$$super$runTest(TestSparkContext.scala:16) 
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext.runTest(TestSparkContext.scala:16) 
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) 
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) 
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) 
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) 
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) 
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) 
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) 
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) 
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) 
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714) 
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683) 
at org.scalatest.Suite$class.run(Suite.scala:1424) 
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683) 
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) 
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) 
at org.scalatest.SuperEngine.runImpl(Engine.scala:545) 
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext.org$scalatest$BeforeAndAfter$$super$run(TestSparkContext.scala:16) 
at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) 
at com.pearson.tellurium.analytics.aggregation.TestSparkContext.run(TestSparkContext.scala:16) 
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) 
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) 
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) 
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) 
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) 
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) 
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) 
at org.scalatest.tools.Runner$.run(Runner.scala:883) 
at org.scalatest.tools.Runner.run(Runner.scala) 
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) 
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

답변

1

이 문제를 해결할 수있었습니다. 사실 그것은 호환되지 않는 버전의 잭슨 라이브러리 때문에 발생했습니다. 나는 Hadoop 2.6과 함께 spark 1.6 버전을 사용하고있다. 나는 jackson-databind 버전 2.4.4를 사용했다. 그게 내 문제를 완벽하게 해결했다. 의존성 아래에서 이러한 비 호환성이 해결되었습니다. 이전에이 문제를 만든 2.2.4 버전을 사용했습니다.

<dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.4.4</version> 
    </dependency>