3

이전 3 개 값의 창 크기에 대한 달러의 롤링 중간 값은 어떻게 계산합니까?Window()를 사용하여 Pyspark에서 롤링 중간 값을 계산하는 방법은 무엇입니까?

입력 데이터

dollars timestampGMT  
25  2017-03-18 11:27:18 
17  2017-03-18 11:27:19 
13  2017-03-18 11:27:20 
27  2017-03-18 11:27:21 
13  2017-03-18 11:27:22 
43  2017-03-18 11:27:23 
12  2017-03-18 11:27:24 

예상 출력 데이터 코드 아래

dollars timestampGMT   rolling_median_dollar 
25  2017-03-18 11:27:18 median(25) 
17  2017-03-18 11:27:19 median(17,25) 
13  2017-03-18 11:27:20 median(13,17,25) 
27  2017-03-18 11:27:21 median(27,13,17) 
13  2017-03-18 11:27:22 median(13,27,13) 
43  2017-03-18 11:27:23 median(43,13,27) 
12  2017-03-18 11:27:24 median(12,43,13) 

는 평균 이동 않지만 pyspark 나던() F.median 있습니다.

pyspark: rolling average using timeseries data

EDIT 1 도전은 중간() 함수 그다지 출구이다. 내가 할 수 있었다 이동 평균 원한다면 나는

df = df.withColumn('rolling_average', F.median("dollars").over(w)) 

할 수 없습니다

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

편집 2 : approxQuantile를 사용하여 시도()

windfun = Window().partitionBy().orderBy(F.col(date_column)).rowsBetwe‌​en(-3, 0) sdf.withColumn("movingMedian", sdf.approxQuantile(col='a', probabilities=[0.5], relativeError=0.00001).over(windfun)) 

그러나 점점 오류

AttributeError: 'list' object has no attribute 'over' 

EDIT 3

촉매 최적화로 인한 이점이 없기 때문에 Udf없이 솔루션을 제공하십시오.

+0

당신이 timestampGMT''에 의해 주문하고 계산을하려고 했나 창 당 행 이상? 문제가 무엇인지 궁금해서 (중앙값의 구현이 그 문제 일지 궁금해 할 때). –

+0

님께 서 정확한 질문을 포함하도록 질문을 편집했습니다. –

+0

'df.stat.approxQuantile' 및 https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglogand.quantiles .html? –

답변

2

한 가지 방법은 윈도우 당 목록으로 $dollars 열을 수집 한 후 사용하여 결과 목록의 중간 값을 계산하는 것입니다 udf :

from pyspark.sql.window import Window 
from pyspark.sql.functions import * 
import numpy as np 
from pyspark.sql.types import FloatType 

w = (Window.orderBy(col("timestampGMT").cast('long')).rangeBetween(-2, 0)) 
median_udf = udf(lambda x: float(np.median(x)), FloatType()) 

df.withColumn("list", collect_list("dollars").over(w)) \ 
    .withColumn("rolling_median", median_udf("list")).show(truncate = False) 
+-------+---------------------+------------+--------------+ 
|dollars|timestampGMT   |list  |rolling_median| 
+-------+---------------------+------------+--------------+ 
|25  |2017-03-18 11:27:18.0|[25]  |25.0   | 
|17  |2017-03-18 11:27:19.0|[25, 17] |21.0   | 
|13  |2017-03-18 11:27:20.0|[25, 17, 13]|17.0   | 
|27  |2017-03-18 11:27:21.0|[17, 13, 27]|17.0   | 
|13  |2017-03-18 11:27:22.0|[13, 27, 13]|13.0   | 
|43  |2017-03-18 11:27:23.0|[27, 13, 43]|27.0   | 
|12  |2017-03-18 11:27:24.0|[13, 43, 12]|13.0   | 
+-------+---------------------+------------+--------------+ 
+1

님께 추가했습니다. 하지만 Udf없이 촉매 최적화로 이익을 얻지 못할 수 있습니까? –

+0

네이티브 스파크 대안이 없습니다. 두려워요. – mtoto

+0

윈도우 함수와 함께 percentRank()를 사용하는 것은 어떻습니까? 나는 어딘가에서 읽었지만 코드는 주어지지 않았다. 벨이 울리는가? –