0
나는 내결함성을 포함한 Spark Streaming with Kafka 어플리케이션을 구현하려고합니다. 응용 프로그램을 다시 시작하면 다시 시작하기 전에 이미 읽은 메시지를 읽고 계산이 잘못되었습니다. 이 문제를 해결하도록 도와주세요.Spark 스트리밍 체크 포인트 읽기 실패 후
다음은 Java로 작성된 코드입니다.
public static JavaStreamingContext createContextFunc() {
SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints();
ApplicationConf conf = new ApplicationConf();
String checkpointDir = conf.getCheckpointDirectory();
JavaStreamingContext streamingContext = app.getStreamingContext(checkpointDir);
JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext);
return streamingContext;
}
public static void main(String[] args) throws InterruptedException {
String checkpointDir = conf.getCheckpointDirectory();
Function0<JavaStreamingContext> createContextFunc =() -> createContextFunc();
JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc);
streamingContext.start();
streamingContext.awaitTermination();
}
public JavaStreamingContext getStreamingContext(String checkpointDir) {
ApplicationConf conf = new ApplicationConf();
String appName = conf.getAppName();
String master = conf.getMaster();
int duration = conf.getDuration();
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));
streamingContext.checkpoint(checkpointDir);
return streamingContext;
}
public SparkSession getSession() {
ApplicationConf conf = new ApplicationConf();
String appName = conf.getAppName();
String hiveConf = conf.getHiveConf();
String thriftConf = conf.getThriftConf();
int shufflePartitions = conf.getShuffle();
SparkSession spark = SparkSession
.builder()
.appName(appName)
.config("spark.sql.warehouse.dir", hiveConf)
.config("hive.metastore.uris", thriftConf)
.enableHiveSupport()
.getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions);
return spark;
}
public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) {
KafkaConfig kafkaConfig = new KafkaConfig();
Set<String> topicsSet = kafkaConfig.getTopicSet();
Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams();
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<String> logdata = messages.map(ConsumerRecord::value);
return logdata;
}
다음은 github 프로젝트에 대한 링크입니다. https://github.com/ThisaST/Spark-Fault-Tolerance