가능한 한 가지 방법은 창 기능을 사용하는 것입니다.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, abs}
val df = Seq(
("A", -10), ("A", 1), ("A", 5), ("B", 3), ("B", 9)
).toDF("type", "time")
첫 번째 시간으로 분류 연속 행 사이의 차이를 결정할 수 있습니다
:
// Partition by type and sort by time
val w1 = Window.partitionBy($"Type").orderBy($"Time")
// Difference between this and previous
val diff = $"time" - lag($"time", 1).over(w1)
그런 다음 해당 유형에 대한 모든 차이점을 통해 최소 찾기 : 당신의 목표는을 찾는 것입니다
// Partition by time unordered and take unbounded window
val w2 = Window.partitionBy($"Type").rowsBetween(Long.MinValue, Long.MaxValue)
// Minimum difference over type
val minDiff = min(diff).over(w2)
df.withColumn("min_diff", minDiff).show
// +----+----+--------+
// |type|time|min_diff|
// +----+----+--------+
// | A| -10| 4|
// | A| 1| 4|
// | A| 5| 4|
// | B| 3| 6|
// | B| 9| 6|
// +----+----+--------+
경우
현재 행과 그룹의 다른 행 사이의 최소 거리 (비슷한 접근법을 사용할 수 있음)
import org.apache.spark.sql.functions.{lead, when}
// Diff to previous
val diff_lag = $"time" - lag($"time", 1).over(w1)
// Diff to next
val diff_lead = lead($"time", 1).over(w1) - $"time"
val diffToClosest = when(
diff_lag < diff_lead || diff_lead.isNull,
diff_lag
).otherwise(diff_lead)
df.withColumn("diff_to_closest", diffToClosest)
// +----+----+---------------+
// |type|time|diff_to_closest|
// +----+----+---------------+
// | A| -10| 11|
// | A| 1| 4|
// | A| 5| 4|
// | B| 3| 6|
// | B| 9| 6|
// +----+----+---------------+