1
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo")
val scc = new StreamingContext(sparkConf, Duration(5000))
val topics = Set("test1")
val kafkaParam = Map(
"metadata.broker.list" -> "localhost:9092"
)
val stream: InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)
val words = stream.map(_._2)
.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show() wordCountsDataFrame.rdd.saveAsTextFile(dir)
}
scc.start()
scc.awaitTermination()
이 사용 스파크 스트리밍에 대한 내 코드가 SQL 메시지를 실행하는 것입니다 결과를 HDFS로 저장하지만 테이블이 아닌 빈 파일이나 작은 파일 (한 줄의 데이터 만)을 출력합니다.
나는 마지막으로이 시도 : wordCountsDataFrame.rdd.coalesce (1, false)를 .saveAsTextFile ("/ 경로// 출력") 내가 HDFS에 메시지를 기록 너무 많은 patitions이 있다는 결과를 알고 . – zhenganqing
당신에게 효과가 있습니까? –