2

내가 이름으로 그룹화 열에 분기 이동 평균을 계산하기 위해 노력하고있어를 사용하여 이동 평균을 계산하는 동안 처음 몇 값을 폐기하고 난스파크 윈도우 함수

val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-2, 0) 
로 스파크 창 기능 사양을 정의한

내 DataFrame은 다음과 같습니다

enter image description here

+-----+----------+-----------+------------------+ 
| name|  date|amountSpent|   movingAvg| 
+-----+----------+-----------+------------------+ 
| Bob|2016-01-01|  25.0|    25.0| 
| Bob|2016-02-02|  25.0|    25.0| 
| Bob|2016-03-03|  25.0|    25.0| 
| Bob|2016-04-04|  29.0|26.333333333333332| 
| Bob|2016-05-06|  27.0|    27.0| 
|Alice|2016-01-01|  50.0|    50.0| 
|Alice|2016-02-03|  45.0|    47.5| 
|Alice|2016-03-04|  55.0|    50.0| 
|Alice|2016-04-05|  60.0|53.333333333333336| 
|Alice|2016-05-06|  65.0|    60.0| 
+-----+----------+-----------+------------------+ 

정확하게 각 이름의 할머니에 대한 강조 계산 첫 번째 값 웁. 처음 두 값을 어떤 문자열, 예를 들어 NULL으로 바꾸고 싶습니다. Spark/Scala에 대한 제한된 지식으로 DataFrame에서이 열을 추출하고 Scala에서 patch 함수를 사용하는 것에 대해 생각해 보았습니다. 그러나 두 번째 이름 그룹 시작과 같은 간격으로 값을 대체하는 방법을 알아낼 수 없습니다.

import com.datastax.spark.connector._ 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types.StructField 
import org.apache.spark.sql.types.StringType 
import org.apache.spark.sql.types.IntegerType 
import org.apache.spark.sql.types.DoubleType 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 
object Test { 

    def main(args: Array[String]) { 
    //val sparkSession = SparkSession.builder.master("local").appName("Test").config("spark.cassandra.connection.host", "localhost").config("spark.driver.host", "localhost").getOrCreate() 
    val sparkSession = SparkSession.builder.master("local").appName("Test").config("spark.cassandra.connection.host", "localhost").config("spark.driver.host", "localhost").getOrCreate() 
    val sc = sparkSession.sparkContext 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sparkSession.implicits._ 

    val customers = sc.parallelize(List(("Alice", "2016-01-01", 50.00), 
     ("Alice", "2016-02-03", 45.00), 
     ("Alice", "2016-03-04", 55.00), 
     ("Alice", "2016-04-05", 60.00), 
     ("Alice", "2016-05-06", 65.00), 
     ("Bob", "2016-01-01", 25.00), 
     ("Bob", "2016-02-02", 25.00), 
     ("Bob", "2016-03-03", 25.00), 
     ("Bob", "2016-04-04", 29.00), 
     ("Bob", "2016-05-06", 27.00))).toDF("name", "date", "amountSpent") 

    import org.apache.spark.sql.expressions.Window 
    import org.apache.spark.sql.functions._ 

    // Create a window spec. 
    val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-2, 0) 

    val ls=customers.withColumn("movingAvg",avg(customers("amountSpent")).over(wSpec1)) 
    ls.show() 

    } 
} 
+0

당신이 재현에서 데이터를 공유 할 수 있습니다 (즉, 전체 범위 -2 0에 걸쳐)의 평균을 계산에만 제안 : 여기에 내 코드입니다 그림 대신에 형식을 써주시겠습니까? – mtoto

+0

완료! 제안 해 주셔서 감사합니다. – Swapnil

+0

그래서 예상되는 출력은 처음 두 값을 그룹별로 null로 다시 복제하는 것입니까? 행 3 이후의 값은 어떨까요? – mtoto

답변

3

나는 윈도우 exactely 3 행이 포함 된 경우

val ls=customers 
.withColumn("count",count(($"amountSpent")).over(wSpec1)) 
.withColumn("movingAvg",when($"count"===3,avg(customers("amountSpent")).over(wSpec1))) 

ls.show() 


+-----+----------+-----------+-----+------------------+ 
| name|  date|amountSpent|count|   movingAvg| 
+-----+----------+-----------+-----+------------------+ 
| Bob|2016-01-01|  25.0| 1|    null| 
| Bob|2016-02-02|  25.0| 2|    null| 
| Bob|2016-03-03|  25.0| 3|    25.0| 
| Bob|2016-04-04|  29.0| 3|26.333333333333332| 
| Bob|2016-05-06|  27.0| 3|    27.0| 
|Alice|2016-01-01|  50.0| 1|    null| 
|Alice|2016-02-03|  45.0| 2|    null| 
|Alice|2016-03-04|  55.0| 3|    50.0| 
|Alice|2016-04-05|  60.0| 3|53.333333333333336| 
|Alice|2016-05-06|  65.0| 3|    60.0| 
+-----+----------+-----------+-----+------------------+