2017-09-29 9 views
1

'user_name', 'mac', 'dayte'(day)의 데이터 집합이 있습니다. GROUP BY [ 'user_name'] (으)로 그룹화하고 싶습니다. 그런 다음 GROUP BY에서 'dayte'를 사용하여 30 일 동안 진행되는 WINDOW를 만듭니다. 그 롤링 30 일 기간에, 나는 '맥'의 고유 번호를 계산하고 싶습니다. 그리고 그것을 내 데이터 프레임에 추가하십시오. 데이터 샘플.Python 롤링 30 일 기간 GROUP By 숫자 별 구분 문자열

user_name mac dayte 
0 001j 7C:D1 2017-09-15 
1 0039711 40:33 2017-07-25 
2 0459 F0:79 2017-08-01 
3 0459 F0:79 2017-08-06 
4 0459 F0:79 2017-08-31 
5 0459 78:D7 2017-09-08 
6 0459 E0:C7 2017-09-16 
7 133833 18:5E 2017-07-27 
8 133833 F4:0F 2017-07-31 
9 133833 A4:E4 2017-08-07 

나는 이것을 PANDAs 데이터 프레임으로 해결하려고 노력했다.

df['ct_macs'] = df.groupby(['user_name']).rolling('30d', on='dayte').mac.apply(lambda x:len(x.unique())) 

그러나 내가 PySpark에 시도 오류

Exception: cannot handle a non-unique multi-index! 

을 받았지만,뿐만 아니라 오류가 발생했습니다.

from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

#convert string timestamp to timestamp type    
df= df.withColumn('dayte', df.dayte.cast('timestamp')) 
#create window by casting timestamp to long (number of seconds) 
w = Window.partitionBy("user_name").orderBy("dayte").rangeBetween(-days(30), 0) 

df= df.select("user_name","mac","dayte",F.size(F.denseRank().over(w).alias("ct_mac"))) 

그러나이 불꽃에, 분명히, (윈도우에서 고유 한 카운트)는 지원되지 않습니다 나는 또한
df= df.select("user_name","dayte",F.countDistinct(col("mac")).over(w).alias("ct_mac")) 

을 시도하지만 그것의 오류

Py4JJavaError: An error occurred while calling o464.select. 
: org.apache.spark.sql.AnalysisException: Window function dense_rank does not take a frame specification.; 

을 받았다. 전적으로 SQL 접근 방식을 사용할 수 있습니다. MySQL이나 SQL Server에서 Python이나 Spark를 선호합니다.

답변

0

Pyspark

윈도우 함수는 다음과 같은 방법으로 한정되어

  • 프레임 만 행에 의해 정의 될 수없는 항목이
  • 열거 존재하지 않는
  • countDistinct
  • 값 기능은 프레임과 함께 사용할 수 없습니다.

대신 테이블을 자기 조인 할 수 있습니다. joingroupBy 지금

df = sc.parallelize([["001j", "7C:D1", "2017-09-15"], ["0039711", "40:33", "2017-07-25"], ["0459", "F0:79", "2017-08-01"], 
        ["0459", "F0:79", "2017-08-06"], ["0459", "F0:79", "2017-08-31"], ["0459", "78:D7", "2017-09-08"], 
        ["0459", "E0:C7", "2017-09-16"], ["133833", "18:5E", "2017-07-27"], ["133833", "F4:0F", "2017-07-31"], 
        ["133833", "A4:E4", "2017-08-07"]]).toDF(["user_name", "mac", "dayte"]) 

:

우선의이 dataframe 만들 수

import pyspark.sql.functions as psf 
df.alias("left")\ 
    .join(
     df.alias("right"), 
     (psf.col("left.user_name") == psf.col("right.user_name")) 
     & (psf.col("right.dayte").between(psf.date_add("left.dayte", -30), psf.col("left.dayte"))), 
     "leftouter")\ 
    .groupBy(["left." + c for c in df.columns])\ 
    .agg(psf.countDistinct("right.mac").alias("ct_macs"))\ 
    .sort("user_name", "dayte").show() 

    +---------+-----+----------+-------+ 
    |user_name| mac|  dayte|ct_macs| 
    +---------+-----+----------+-------+ 
    |  001j|7C:D1|2017-09-15|  1| 
    | 0039711|40:33|2017-07-25|  1| 
    |  0459|F0:79|2017-08-01|  1| 
    |  0459|F0:79|2017-08-06|  1| 
    |  0459|F0:79|2017-08-31|  1| 
    |  0459|78:D7|2017-09-08|  2| 
    |  0459|E0:C7|2017-09-16|  3| 
    | 133833|18:5E|2017-07-27|  1| 
    | 133833|F4:0F|2017-07-31|  2| 
    | 133833|A4:E4|2017-08-07|  3| 
    +---------+-----+----------+-------+ 

팬더이 python3 작동

import pandas as pd 
import numpy as np 
df["mac"] = pd.factorize(df["mac"])[0] 
df.groupby('user_name').rolling('30D', on="dayte").mac.apply(lambda x: len(np.unique(x)))