2014-09-25 3 views
2

자바 병을 처리 중입니다. 누산기는 스트림 값을 합산합니다. 문제는 매번 증가하거나 특정 주기로 UI의 값을 표시하려고합니다.DStream에 업데이트 된 현재 누적 기 값을 표시하는 방법은 무엇입니까?

그러나 누적 기 값은 드라이버 프로그램에서만 가져올 수 있으므로 프로세스 실행이 완료 될 때까지이 값에 액세스 할 수 없습니다. 내가이 값에 주기적으로 어떻게 접근 할 수 있는지에 대한 생각은? 나는 카프카를 사용하여 데이터를 스트리밍하고

package com.spark; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.spark.Accumulator; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import scala.Tuple2; 

public class KafkaSpark { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setAppName("Simple Application"); 
     conf.setMaster("local"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, 
       new Duration(5000)); 
     final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0); 
     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     topicMap.put("test", 1); 
     JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc, 
       "localhost:2181", "group1", topicMap); 

     JavaDStream<Integer> map = lines 
       .map(new Function<Tuple2<String, String>, Integer>() { 

        public Integer call(Tuple2<String, String> v1) 
          throws Exception { 
         if (v1._2.contains("the")) { 
          accum.add(1); 
          return 1; 
         } 
         return 0; 
        } 
       }); 

     map.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
     System.out.println("*************" + accum.value()); 
     System.out.println("done"); 
    } 
} 

아래에 주어진

내 코드입니다.

+0

어큐뮬레이터가 유스 케이스에 적합한 이유는 무엇이라고 생각하십니까? 이 집계의 현재 가치, 즉 고객이 변경 사항에 대한 알림을 받기 위해 사용하는 또 다른 카프카 (Kafka) 주제를 반영하기 위해 외부 데이터 저장소에 대해 생각하고 싶습니다. –

답변

1

jssc.star()가 호출 될 때만 스파크가 발생하며 실제 코드가 실행되기 시작합니다. 이제 컨트롤은 루프를 실행하기 시작합니다. 모든 system.out.println은 한 번만 호출됩니다. 매 루프마다 실행되지 않습니다. 아웃

이 작업이 documentation

당신이 저장 개체 텍스트 또는 하둡 파일로

인쇄() forEachRDD() 를 사용할 수 있습니다 확인 넣어

희망이

0
jssc.start(); 
while(true) { 
    System.out.println("current:" + accum.value()); 
    Thread.sleep(1000); 
} 
을하는 데 도움이