2017-10-09 11 views
1

HDFS에서 데이터를 읽기 위해 스파크 스트리밍을 사용하고 싶습니다. 아이디어는 다른 프로그램이 내 스파크 스트리밍 작업이 처리 할 HDFS 디렉토리에 새 파일을 계속 업로드한다는 것입니다. 그러나, 나는 또한 끝 조건을 갖고 싶다. 즉, HDFS로 파일을 업로드하는 프로그램이 스파크 스트리밍 프로그램에 신호를 보내면 모든 파일을 업로드하는 방법입니다.스파크 스트리밍에서 정지 조건을 만드는 방법은 무엇입니까?

간단한 예를 들면 Here에서 프로그램을 가져옵니다. 코드는 아래와 같습니다. 다른 프로그램이 해당 파일을 업로드한다고 가정하면 스파크 스트리밍 프로그램에 CTRL + C를 누를 필요없이 해당 프로그램에서 최종 상태를 프로 그램으로 알릴 수 있습니까?

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

object StreamingWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage StreamingWordCount <input-directory> <output-directory>") 
     System.exit(0) 
    } 
    val inputDir=args(0) 
    val output=args(1) 
    val conf = new SparkConf().setAppName("Spark Streaming Example") 
    val streamingContext = new StreamingContext(conf, Seconds(10)) 
    val lines = streamingContext.textFileStream(inputDir) 
    val words = lines.flatMap(_.split(" ")) 
    val wc = words.map(x => (x, 1)) 
    wc.foreachRDD(rdd => { 
     val counts = rdd.reduceByKey((x, y) => x + y) 
     counts.saveAsTextFile(output) 
     val collectedCounts = counts.collect 
     collectedCounts.foreach(c => println(c)) 
    } 
    ) 

    println("StreamingWordCount: streamingContext start") 
    streamingContext.start() 
    println("StreamingWordCount: await termination") 
    streamingContext.awaitTermination() 
    println("StreamingWordCount: done!") 
    } 
} 
+0

당신은 당신의 일 업로드 데이터의 끝 부분에 약간의 제어 바이트를 추가 한 다음 스파크 스트리밍 프로그램에서 그 바이트를 감시하고 그 바이트가 일치 할 때 종료 할 수 있을까요? 0x1c0x0d와 같은 것을 추가 하시겠습니까? 또한 파일을 업로드 한 후에 Spark 스트리밍을 사용하여 다른 작업을 시작하지 않는 이유는 무엇입니까? – pjames

답변

0

그래, 알겠습니다. 기본적으로 ssc.stop()을 호출하는 곳에서 다른 스레드를 생성하여 스트림 처리를 중지하라는 신호를 보냅니다. 예를 들어, 이렇게.

val ssc = new StreamingContext(sparkConf, Seconds(1)) 
////////////////////////////////////////////////////////////////////// 
val thread = new Thread 
{ 
    override def run 
    { 
     .... 
     // On reaching the end condition 
     ssc.stop() 
    } 
} 
thread.start 
////////////////////////////////////////////////////////////////////// 
val lines = ssc.textFileStream("inputDir") 
.....