2017-12-07 5 views
1

다음 코드 스 니펫이 있는데이 두 코드의 차이점은 무엇이며 어떤 코드를 사용해야합니까? 나는 스파크 2.2를 사용하고있다.SparkSession.sql과 Dataset.sqlContext.sql의 차이점은 무엇입니까?

Dataset<Row> df = sparkSession.readStream() 
    .format("kafka") 
    .load(); 

df.createOrReplaceTempView("table"); 
df.printSchema(); 

Dataset<Row> resultSet = df.sqlContext().sql("select value from table"); //sparkSession.sql(this.query); 
StreamingQuery streamingQuery = resultSet 
     .writeStream() 
     .trigger(Trigger.ProcessingTime(1000)) 
     .format("console") 
     .start(); 

Dataset<Row> df = sparkSession.readStream() 
    .format("kafka") 
    .load(); 

df.createOrReplaceTempView("table"); 

Dataset<Row> resultSet = sparkSession.sql("select value from table"); //sparkSession.sql(this.query); 
StreamingQuery streamingQuery = resultSet 
     .writeStream() 
     .trigger(Trigger.ProcessingTime(1000)) 
     .format("console") 
     .start(); 

답변

2

대는 df.sqlContext().sql("sql query")sparkSession.sql("sql query") 사이에 미묘한 차이가 입니다.

은 하나의 불꽃 응용 프로그램에서 제로, 두 개 이상의 SparkSession들 수 있습니다 (그러나 당신이 적어도이 종종 하나의 SparkSession불꽃 SQL 응용 프로그램에서 것이다 가정한다) 있습니다.

DatasetSparkSession에 바인딩되며 SparkSession은 변경되지 않습니다.

누군가가 원하는 이유가 궁금 할 지 모르지만 쿼리간에 경계가 생기며 서로 다른 데이터 세트에 대해 동일한 테이블 이름을 사용할 수 있습니다. 실제로 스파크 SQL의 매우 강력한 기능입니다.

다음 예제는 그 차이점을 보여 주며 결과가 어째서 강력한 지 알 수 있습니다.

scala> spark.version 
res0: String = 2.3.0-SNAPSHOT 

scala> :type spark 
org.apache.spark.sql.SparkSession 

scala> spark.sql("show tables").show 
+--------+---------+-----------+ 
|database|tableName|isTemporary| 
+--------+---------+-----------+ 
+--------+---------+-----------+ 

scala> val df = spark.range(5) 
df: org.apache.spark.sql.Dataset[Long] = [id: bigint] 

scala> df.sqlContext.sql("show tables").show 
+--------+---------+-----------+ 
|database|tableName|isTemporary| 
+--------+---------+-----------+ 
+--------+---------+-----------+ 

scala> val anotherSession = spark.newSession 
anotherSession: org.apache.spark.sql.SparkSession = [email protected] 

scala> anotherSession.range(10).createOrReplaceTempView("new_table") 

scala> anotherSession.sql("show tables").show 
+--------+---------+-----------+ 
|database|tableName|isTemporary| 
+--------+---------+-----------+ 
|  |new_table|  true| 
+--------+---------+-----------+ 


scala> df.sqlContext.sql("show tables").show 
+--------+---------+-----------+ 
|database|tableName|isTemporary| 
+--------+---------+-----------+ 
+--------+---------+-----------+