2016-06-03 4 views
3

예제를 통해 내 시나리오를보다 쉽게 ​​설명 할 수 있습니다. I는 다음과 같은 데이터를 가지고 말 :이전 열과 가장 가까운 차이를 나타내는 추가 열 추가

Type Time A 1 B 3 A 5 B 9

I는 동일한 유형의 모든 열 사이의 최소 절대 값 차이를 나타내는 각 행에 추가 항목을 추가 할. 따라서 첫 번째 행의 경우 유형 A의 모든 시간의 최소 차이는 4이므로 열 1과 3의 경우 4, 열 2와 4의 경우 6이됩니다.

나는 Spark Spark SQL을 사용하면보다 유용한 정보를 얻을 수 있지만 SQL을 통해 설명해야 할 필요가 있다면 큰 도움이 될 것입니다.

답변

1

가능한 한 가지 방법은 창 기능을 사용하는 것입니다.

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{lag, min, abs} 

val df = Seq(
    ("A", -10), ("A", 1), ("A", 5), ("B", 3), ("B", 9) 
).toDF("type", "time") 
첫 번째 시간으로 분류 연속 행 사이의 차이를 결정할 수 있습니다

:

// Partition by type and sort by time 
val w1 = Window.partitionBy($"Type").orderBy($"Time") 

// Difference between this and previous 
val diff = $"time" - lag($"time", 1).over(w1) 

그런 다음 해당 유형에 대한 모든 차이점을 통해 최소 찾기 : 당신의 목표는을 찾는 것입니다

// Partition by time unordered and take unbounded window 
val w2 = Window.partitionBy($"Type").rowsBetween(Long.MinValue, Long.MaxValue) 

// Minimum difference over type 
val minDiff = min(diff).over(w2) 

df.withColumn("min_diff", minDiff).show 


// +----+----+--------+ 
// |type|time|min_diff| 
// +----+----+--------+ 
// | A| -10|  4| 
// | A| 1|  4| 
// | A| 5|  4| 
// | B| 3|  6| 
// | B| 9|  6| 
// +----+----+--------+ 

경우

현재 행과 그룹의 다른 행 사이의 최소 거리 (비슷한 접근법을 사용할 수 있음)

import org.apache.spark.sql.functions.{lead, when} 

// Diff to previous 
val diff_lag = $"time" - lag($"time", 1).over(w1) 

// Diff to next 
val diff_lead = lead($"time", 1).over(w1) - $"time" 

val diffToClosest = when(
    diff_lag < diff_lead || diff_lead.isNull, 
    diff_lag 
).otherwise(diff_lead) 

df.withColumn("diff_to_closest", diffToClosest) 

// +----+----+---------------+ 
// |type|time|diff_to_closest| 
// +----+----+---------------+ 
// | A| -10|    11| 
// | A| 1|    4| 
// | A| 5|    4| 
// | B| 3|    6| 
// | B| 9|    6| 
// +----+----+---------------+ 
0

당신이 뭔가를 시도해야합니다 :

val sc: SparkContext = ... 
val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._ 

val input = sc.parallelize(Seq(
    ("A", 1), 
    ("B", 3), 
    ("A", 5), 
    ("B", 9) 
)) 

val df = input.groupByKey().flatMap { case (key, values) => 
    val smallestDiff = values.toList.sorted match { 
    case firstMin :: secondMin :: _ => secondMin - firstMin 
    case singleVal :: Nil => singleVal // Only one record for some `Type` 
    } 

    values.map { value => 
    (key, value, smallestDiff) 
    } 
}.toDF("Type", "Time", "SmallestDiff") 

df.show() 

출력 :

SQL 서버에서 테스트
+----+----+------------+ 
|Type|Time|SmallestDiff| 
+----+----+------------+ 
| A| 1|   4| 
| A| 5|   4| 
| B| 3|   6| 
| B| 9|   6| 
+----+----+------------+ 
1

2008

표 D ( 유형 VARCHAR (25), 시간 INT를 만들 )

insert into d 
values ('A',1), 
('B',3), 
('A',5), 
('B',9) 

--solution one, calculation in query, might not be smart if dataset is large. 
select * 
, (select max(time) m from d as i where i.type = o.type) - (select MIN(time) m from d as i where i.type = o.type) dif 
from d as o 

--or this 
select d.*, diftable.dif from d inner join 
(select type, MAX(time) - MIN(time) dif 
from d group by type) as diftable on d.type = diftable.type