0
Kafka에서 데이터를 수신하기 위해 spark 소비자 API를 만들려고합니다. 그러나 소비자 코드에서이 두 클래스에 jar/dependency를 추가 할 수 없습니다. import org.apache .spark.streaming.scheduler.ReceiverLauncher; import org.apache.spark.streaming.Scheduler;Kafka Spark Consumer API Dependancy 관련 문제
나는 카프카 0.11.0.1를 사용하여 내 로컬 컴퓨터에서 2.2.0 스파크 내 소비자 코드는 오전 :
package kafkatest2;
import java.io.Serializable;
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.scheduler.ReceiverLauncher;
import consumer.kafka.MessageAndMetadata;
import kafka.consumer.Consumer;
import org.apache.spark.streaming.Scheduler;
//import kafka.consumer.Consumer;
//import kafka.message.MessageAndMetadata;
public class ConsumerTest implements Serializable {
private static final long serialVersionUID = 4332618245650072140L;
public void start() throws InstantiationException, IllegalAccessException,
ClassNotFoundException {
run();
}
private void run() {
Properties props = new Properties();
props.put("zookeeper.hosts", "localhost");
props.put("zookeeper.port", "2181");
props.put("zookeeper.broker.path", "/brokers");
props.put("kafka.topic", "test-topic");
props.put("kafka.consumer.id", "test-id");
props.put("zookeeper.consumer.connection", "localhost:2182");
props.put("zookeeper.consumer.path", "/spark-kafka");
// Optional Properties
props.put("consumer.forcefromstart", "true");
props.put("consumer.fetchsizebytes", "1048576");
props.put("consumer.fillfreqms", "250");
props.put("consumer.backpressure.enabled", "true");
SparkConf _sparkConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.enable", "false");
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, new Duration(1000));
// Specify number of Receivers you need.
int numberOfReceivers = 3;
JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
unionStreams
.foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() {
@Override
public Void call(JavaRDD<MessageAndMetadata> rdd, Time time) throws Exception {
rdd.collect();
System.out.println(" Number of records in this batch "
+ rdd.count());
return null;
}
});
jsc.start();
jsc.awaitTermination();
}
public static void main(String[] args) throws Exception {
ConsumerTest consumer = new ConsumerTest();
consumer.start();
}
}