2017-11-17 9 views
1
def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo") 
    val scc = new StreamingContext(sparkConf, Duration(5000)) 
    val topics = Set("test1") 
    val kafkaParam = Map(
    "metadata.broker.list" -> "localhost:9092" 
) 

    val stream: InputDStream[(String, String)] = createStream(scc, kafkaParam, topics) 

    val words = stream.map(_._2) 
    .flatMap(_.split(" ")) 

    // Convert RDDs of the words DStream to DataFrame and run SQL query 
    words.foreachRDD { (rdd: RDD[String], time: Time) => 
    // Get the singleton instance of SparkSession 
    val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) 
    import spark.implicits._ 

    // Convert RDD[String] to RDD[case class] to DataFrame 
    val wordsDataFrame = rdd.map(w => Record(w)).toDF() 

    // Creates a temporary view using the DataFrame 
    wordsDataFrame.createOrReplaceTempView("words") 

    // Do word count on table using SQL and print it 
    val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word") 
    wordCountsDataFrame.show() wordCountsDataFrame.rdd.saveAsTextFile(dir) 
} 
    scc.start() 
    scc.awaitTermination() 

가}사용 스파크 스트리밍

이 사용 스파크 스트리밍에 대한 내 코드가 SQL 메시지를 실행하는 것입니다 결과를 HDFS로 저장하지만 테이블이 아닌 빈 파일이나 작은 파일 (한 줄의 데이터 만)을 출력합니다.

답변

0

도움이되는지 알려 주시면 알려주세요.

wordCountsDataFrame.rdd.repartition(1).saveAsTextFile("/path/to/output")

+0

나는 마지막으로이 시도 : wordCountsDataFrame.rdd.coalesce (1, false)를 .saveAsTextFile ("/ 경로// 출력") 내가 HDFS에 메시지를 기록 너무 많은 patitions이 있다는 결과를 알고 . – zhenganqing

+0

당신에게 효과가 있습니까? –