2017-12-14 10 views
1

카프카에서 데이터를 읽고 처리하는 Spark 응용 프로그램이 있습니다. maven을 사용하여 fat jar를 생성하고 명령 : mvn clean compile assembly:single을 작성하면 spark-submit 명령 도구 (No Yarn, 독립 실행 형 클러스터)를 사용하여 Spark 원격 클러스터에 성공적으로 제출할 수 있습니다. 이제 IntelliJ IDE에서 팻 항아리를 생성하지 않고 동일한 응용 프로그램을 실행하려고합니다. 내가 IDE에서 응용 프로그램을 실행 한 후에는 클러스터의 마스터에 있지만 잠시 오류 후 작업을 제출 :IDE를 사용하는 원격 스파크 클러스터에서 Spark 응용 프로그램을 실행하는 중 오류가 발생했습니다.

java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition 

나는 pom.xml 파일에 의존성이 스파크 응용 프로그램에 액세스 할 수 없습니다 생각합니다. 나는 원격 클러스터 아파치 FLINK 응용 프로그램을 실행하는 동일한 문제가 :

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>Saprk</groupId> 
    <artifactId>SparkPMUProcessing</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
      <plugin> 
       <artifactId>maven-assembly-plugin</artifactId> 
       <configuration> 
        <archive> 
         <manifest> 
          <mainClass>SparkTest</mainClass> 
         </manifest> 
        </archive> 
        <descriptorRefs> 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
        </descriptorRefs> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.7.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>0.10.0.0</version> 
     </dependency> 
    </dependencies> 
</project> 

포인트 : 여기

은 pom.xml 파일입니다. Flink뿐만 아니라 Spark, fat jar 및 terminal 명령을 사용하여 올바르게 실행하여 클러스터에 제출하십시오.

업데이트 : 사용 방법 setJars 사용 방법 종속성 jar 파일과 java.lang.ClassNotFoundException: 오류 유형을 소개합니다. 지금은 말한다 :

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1 

여기 내 코드는 다음과 같습니다

public class SparkTest { 

    public static void main(String[] args) throws InterruptedException { 
     SparkConf conf = new SparkConf().setAppName("PMUStreaming").setMaster("spark://namenode1:7077") 
       .set("spark.deploy.mode", "client") 
       .set("spark.executor.memory", "700m").setJars(new String[]{ 
         "/home/xxx/SparkRunningJars/kafka_2.11-0.10.0.0.jar", 
         "/home/xxx/SparkRunningJars/kafka-clients-0.10.0.0.jar", 
         "/home/xxx/SparkRunningJars/spark-streaming-kafka-0-10_2.11-2.2.0.jar" 
       }); 
     Map<String, Object> kafkaParams = new HashMap<>(); 

     Collection<String> TOPIC = Arrays.asList(args[6]); 
     final String BOOTSTRAPSERVERS = args[0]; 
     final String ZOOKEEPERSERVERS = args[1]; 
     final String ID = args[2]; 
     final int BATCH_SIZE = Integer.parseInt(args[3]); 
     final String PATH = args[4]; 
     final String READMETHOD = args[5]; 

     kafkaParams.put("bootstrap.servers", BOOTSTRAPSERVERS); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", ByteArrayDeserializer.class); 
     kafkaParams.put("group.id", ID); 
     kafkaParams.put("auto.offset.reset", READMETHOD); 
     kafkaParams.put("enable.auto.commit", false); 
     kafkaParams.put("metadata.max.age.ms", 30000); 

     JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(BATCH_SIZE)); 
     JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(
       ssc, 
       LocationStrategies.PreferConsistent(), 
       ConsumerStrategies.<String, byte[]>Subscribe(TOPIC, kafkaParams) 
     ); 


     stream.map(record -> getTime(record.value()) + ":" 
       + Long.toString(System.currentTimeMillis()) + ":" 
       + Arrays.deepToString(finall(record.value())) 
       + ":" + Long.toString(System.currentTimeMillis())) 
       .map(record -> record + ":" 
         + Long.toString(Long.parseLong(record.split(":")[3]) - Long.parseLong(record.split(":")[1]))) 
       .repartition(1) 
       .foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { 
        private static final long serialVersionUID = 1L; 

        @Override 
        public void call(JavaRDD<String> rdd, Time time) throws Exception { 
         if (rdd.count() > 0) { 
          rdd.saveAsTextFile(PATH + "/" + time.milliseconds()); 
         } 
        } 
       }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 

답변

1

당신이 대답을 본 적이 있나요? 도움이 될 수도 있습니다.

java.lang.ClassCastException using lambda expressions in spark job on remote server

그냥 setJars를 호출 (새로운 String [] { "/ 경로 /로/병// 당신의/class.jar와"}) 당신이 생각처럼 IDE에서 코드를 실행할 경우 SparkConf 인스턴스 . 기본적으로 항아리를 배포 제출 - 스파크, 그래서 이러한 문제

UPDATE 당신은뿐만 아니라 프로젝트의 항아리를 추가해야 는 거기 없다.

그래서 코드는 여기에

public class SparkTest { 

    public static void main(String[] args) throws InterruptedException { 
     SparkConf conf = new SparkConf().setAppName("PMUStreaming").setMaster("spark://namenode1:7077") 
       .set("spark.deploy.mode", "client") 
       .set("spark.executor.memory", "700m").setJars(new String[]{ 
         "/home/xxx/SparkRunningJars/kafka_2.11-0.10.0.0.jar", 
         "/home/xxx/SparkRunningJars/kafka-clients-0.10.0.0.jar", 
         "/home/xxx/SparkRunningJars/spark-streaming-kafka-0-10_2.11-2.2.0.jar", 
         "/path/to/your/project/target/SparkPMUProcessing-1.0-SNAPSHOT.jar" 
       }); 
     Map<String, Object> kafkaParams = new HashMap<>(); 

     Collection<String> TOPIC = Arrays.asList(args[6]); 
     final String BOOTSTRAPSERVERS = args[0]; 
     final String ZOOKEEPERSERVERS = args[1]; 
     final String ID = args[2]; 
     final int BATCH_SIZE = Integer.parseInt(args[3]); 
     final String PATH = args[4]; 
     final String READMETHOD = args[5]; 

     kafkaParams.put("bootstrap.servers", BOOTSTRAPSERVERS); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", ByteArrayDeserializer.class); 
     kafkaParams.put("group.id", ID); 
     kafkaParams.put("auto.offset.reset", READMETHOD); 
     kafkaParams.put("enable.auto.commit", false); 
     kafkaParams.put("metadata.max.age.ms", 30000); 

     JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(BATCH_SIZE)); 
     JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(
       ssc, 
       LocationStrategies.PreferConsistent(), 
       ConsumerStrategies.<String, byte[]>Subscribe(TOPIC, kafkaParams) 
     ); 


     stream.map(record -> getTime(record.value()) + ":" 
       + Long.toString(System.currentTimeMillis()) + ":" 
       + Arrays.deepToString(finall(record.value())) 
       + ":" + Long.toString(System.currentTimeMillis())) 
       .map(record -> record + ":" 
         + Long.toString(Long.parseLong(record.split(":")[3]) - Long.parseLong(record.split(":")[1]))) 
       .repartition(1) 
       .foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { 
        private static final long serialVersionUID = 1L; 

        @Override 
        public void call(JavaRDD<String> rdd, Time time) throws Exception { 
         if (rdd.count() > 0) { 
          rdd.saveAsTextFile(PATH + "/" + time.milliseconds()); 
         } 
        } 
       }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 
+0

스파크 코드가 추가되었습니다. – soheil

+0

프로젝트의 jar도'setJars'에 추가해야합니다. 올바른 코드로 답변을 업데이트했습니다. – Panos

+0

네 말이 맞아. 나는 그것을 시도했지만 매번 클래스의 새로운 항아리를 생성하지 않고 코드를 변경할 수 있기를 원한다! – soheil

0

내 build.sbt 종속성입니다해야합니다. 그것은 sbt 구성이지만 의존성을 지정하는 데 필요한 것을 인식 할 수 있습니다.

lazy val commonLibraryDependencies = Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion % "provided", 
    "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", 
    "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", 
    "org.apache.spark" %% f"spark-streaming-kafka-$kafkaVersion" % sparkVersion, 
    "org.apache.spark" %% f"spark-sql-kafka-$kafkaVersion" % sparkVersion, 
) 
+0

필자는 필자에게 필 요한 의존성을 알았고 팻 단지를 올바르게 실행할 수 있습니다. 내 문제는 IDE에서 코드를 실행할 수 없다는 것입니다. – soheil

+0

스파크 코드가 추가되었습니다. – soheil