자바 병을 처리 중입니다. 누산기는 스트림 값을 합산합니다. 문제는 매번 증가하거나 특정 주기로 UI의 값을 표시하려고합니다.DStream에 업데이트 된 현재 누적 기 값을 표시하는 방법은 무엇입니까?
그러나 누적 기 값은 드라이버 프로그램에서만 가져올 수 있으므로 프로세스 실행이 완료 될 때까지이 값에 액세스 할 수 없습니다. 내가이 값에 주기적으로 어떻게 접근 할 수 있는지에 대한 생각은? 나는 카프카를 사용하여 데이터를 스트리밍하고
package com.spark;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaSpark {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
new Duration(5000));
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("test", 1);
JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc,
"localhost:2181", "group1", topicMap);
JavaDStream<Integer> map = lines
.map(new Function<Tuple2<String, String>, Integer>() {
public Integer call(Tuple2<String, String> v1)
throws Exception {
if (v1._2.contains("the")) {
accum.add(1);
return 1;
}
return 0;
}
});
map.print();
jssc.start();
jssc.awaitTermination();
System.out.println("*************" + accum.value());
System.out.println("done");
}
}
아래에 주어진
내 코드입니다.
어큐뮬레이터가 유스 케이스에 적합한 이유는 무엇이라고 생각하십니까? 이 집계의 현재 가치, 즉 고객이 변경 사항에 대한 알림을 받기 위해 사용하는 또 다른 카프카 (Kafka) 주제를 반영하기 위해 외부 데이터 저장소에 대해 생각하고 싶습니다. –