2016-12-29 4 views
0

다음은 제 데이터 세트입니다.spark dataset API : 다른 사용자와 함께 각 사용자의 기기 사용량 분포를 확인하십시오.

user,device,time_spent,video_start 
userA,mob,5,1 
userA,desk,5,2 
userA,desk,5,3 
userA,mob,5,2 
userA,mob,5,2 
userB,desk,5,2 
userB,mob,5,2 
userB,mob,5,2 
userB,desk,5,2 

나는 각 사용자에 대해 아래의 집계를 찾고 싶습니다.

user  total_time_spent  device_distribution 
    userA   20    {mob:60%,desk:40%} 
    userB   20    {mob:50%,desk:50%} 

Java에서 spark 2.0 API를 사용하여이를 달성 할 수있는 사람이 있습니까? UserDefinedAggregateFunction을 사용하여 시도했지만 그룹별로 그룹을 지원하지 않으므로 각 사용자 그룹을 장치별로 그룹화해야 각 장치에서 집계 된 시간을 찾을 수 있습니다.

답변

1

여기서 pivot 기능은 매우 유용합니다. 해당 주제에 대한 Databricks의 article 코드의 경우 (죄송는 스칼라하지만 자바로 번역하는 큰 문제가되지 않습니다) :

import org.apache.spark.sql.functions.udf 

case class DeviceDistribution(mob: String, desk: String) 

val makeDistribution = udf((mob: Long, desk: Long) => { 
    val mobPct = 100.0 * mob/(mob + desk) 
    val deskPct = 100.0 * desk/(mob + desk) 

    DeviceDistribution(s"$mobPct%", s"$deskPct%") 
}) 

// load your dataset 

data 
    .groupBy("user", "device") 
    .agg(sum("time_spent").as("total_time_spent_by_device")) 
    .groupBy("user") 
    .pivot("device", Seq("mob", "desk")) 
    .agg(first(col("total_time_spent_by_device"))) 
    .withColumn("total_time_spent", col("mob") + col("desk")) 
    .withColumn("device_distribution", makeDistribution(col("mob"), col("desk"))) 
    .select("user", "total_time_spent", "device_distribution") 
    .show 

// Result 
+-----+----------------+-------------------+ 
| user|total_time_spent|device_distribution| 
+-----+----------------+-------------------+ 
|userA|    25|  [60.0%,40.0%]| 
|userB|    20|  [50.0%,50.0%]| 
+-----+----------------+-------------------+ 

NB : 당신이 집계 함수를 필요로하는 pivot 기능. 여기에는 기기별로 하나의 값만 있기 때문에 first을 사용하면됩니다.

device_distribution 열 형식은 당신이 찾고 있지만하고 있지 정확히 : 당신이 당신의 값으로 원하는 모든 것을 할 수있는 피벗 선 후

  • (즉, 당신이 원하는 서식을 포함) 예를 들어 json 형식으로 출력 데이터를 저장할 때이 case class을 사용하면 정확히 원하는 형식을 갖게됩니다.
1

플로랑 Moiny,

덕분에 내 질문에 대답합니다.

그러나 생산에 적용하려면이 솔루션에 몇 가지 문제가 있음을 발견했습니다.

예를 들어 TB 데이터 소스에서 가능한 장치의 유형을 미리 알아야합니다. 이벤트 피벗은이 상황에서 이해하기가 거의 없습니다.

Java에서이 문제를 완벽하게 해결했습니다. 여기에서 볼 수 있습니다.

이 목적을 위해 UserDefinedAggregateFunction을 사용했습니다. 특히 집계 상황에서는 UDF를 사용해야합니다.

기본적으로 먼저 사용자 및 장치를 그룹화 한 다음이 사용자 지정 UDF를 호출하여 동시에 장치 배포를 찾고 사용자 수준에서 다른 집계를 수행합니다.

https://github.com/himanshu-parmar-bigdata/spark-java-udf-demo

감사합니다, Himanshu