카프카에서 데이터를 읽고 처리하는 Spark 응용 프로그램이 있습니다. maven을 사용하여 fat jar를 생성하고 명령 : mvn clean compile assembly:single
을 작성하면 spark-submit
명령 도구 (No Yarn, 독립 실행 형 클러스터)를 사용하여 Spark 원격 클러스터에 성공적으로 제출할 수 있습니다. 이제 IntelliJ IDE에서 팻 항아리를 생성하지 않고 동일한 응용 프로그램을 실행하려고합니다. 내가 IDE에서 응용 프로그램을 실행 한 후에는 클러스터의 마스터에 있지만 잠시 오류 후 작업을 제출 :IDE를 사용하는 원격 스파크 클러스터에서 Spark 응용 프로그램을 실행하는 중 오류가 발생했습니다.
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition
나는 pom.xml 파일에 의존성이 스파크 응용 프로그램에 액세스 할 수 없습니다 생각합니다. 나는 원격 클러스터 아파치 FLINK 응용 프로그램을 실행하는 동일한 문제가 :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Saprk</groupId>
<artifactId>SparkPMUProcessing</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>SparkTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>
포인트 : 여기
은 pom.xml 파일입니다. Flink뿐만 아니라 Spark, fat jar 및 terminal 명령을 사용하여 올바르게 실행하여 클러스터에 제출하십시오.업데이트 : 사용 방법 setJars
사용 방법 종속성 jar 파일과 java.lang.ClassNotFoundException:
오류 유형을 소개합니다. 지금은 말한다 :
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
여기 내 코드는 다음과 같습니다
public class SparkTest {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("PMUStreaming").setMaster("spark://namenode1:7077")
.set("spark.deploy.mode", "client")
.set("spark.executor.memory", "700m").setJars(new String[]{
"/home/xxx/SparkRunningJars/kafka_2.11-0.10.0.0.jar",
"/home/xxx/SparkRunningJars/kafka-clients-0.10.0.0.jar",
"/home/xxx/SparkRunningJars/spark-streaming-kafka-0-10_2.11-2.2.0.jar"
});
Map<String, Object> kafkaParams = new HashMap<>();
Collection<String> TOPIC = Arrays.asList(args[6]);
final String BOOTSTRAPSERVERS = args[0];
final String ZOOKEEPERSERVERS = args[1];
final String ID = args[2];
final int BATCH_SIZE = Integer.parseInt(args[3]);
final String PATH = args[4];
final String READMETHOD = args[5];
kafkaParams.put("bootstrap.servers", BOOTSTRAPSERVERS);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", ID);
kafkaParams.put("auto.offset.reset", READMETHOD);
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("metadata.max.age.ms", 30000);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(BATCH_SIZE));
JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(TOPIC, kafkaParams)
);
stream.map(record -> getTime(record.value()) + ":"
+ Long.toString(System.currentTimeMillis()) + ":"
+ Arrays.deepToString(finall(record.value()))
+ ":" + Long.toString(System.currentTimeMillis()))
.map(record -> record + ":"
+ Long.toString(Long.parseLong(record.split(":")[3]) - Long.parseLong(record.split(":")[1])))
.repartition(1)
.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> rdd, Time time) throws Exception {
if (rdd.count() > 0) {
rdd.saveAsTextFile(PATH + "/" + time.milliseconds());
}
}
});
ssc.start();
ssc.awaitTermination();
}
스파크 코드가 추가되었습니다. – soheil
프로젝트의 jar도'setJars'에 추가해야합니다. 올바른 코드로 답변을 업데이트했습니다. – Panos
네 말이 맞아. 나는 그것을 시도했지만 매번 클래스의 새로운 항아리를 생성하지 않고 코드를 변경할 수 있기를 원한다! – soheil