2017-12-16 9 views
2

Spark 2에서 이러한 문제를 해결하려고하지만 솔루션을 찾을 수 없습니다. Spark - 데이터 프레임에 정의 된 규칙을 다른 데이터 프레임에 적용하는 방법

은 내가 dataframe을 가지고 :

+----+-------+------+ 
|id |COUNTRY| MONTH| 
+----+-------+------+ 
| 1 | US | 1 | 
| 2 | FR | 1 | 
| 4 | DE | 1 | 
| 5 | DE | 2 | 
| 3 | DE | 3 | 
+----+-------+------+ 

그리고dataframe의 B :

+-------+------+------+ 
|COLUMN |VALUE | PRIO | 
+-------+------+------+ 
|COUNTRY| US | 5 | 
|COUNTRY| FR | 15 | 
|MONTH | 3 | 2 | 
+-------+------+------+ 

아이디어가 순서대로 dataframe A를 dataframe의 B의 "규칙"을 적용하는 것입니다 이 결과를 얻으십시오 :

데이터 프레임 A ' : 0

+----+-------+------+------+ 
|id |COUNTRY| MONTH| PRIO | 
+----+-------+------+------+ 
| 1 | US | 1 | 5 | 
| 2 | FR | 1 | 15 | 
| 4 | DE | 1 | 20 | 
| 5 | DE | 2 | 20 | 
| 3 | DE | 3 | 2 | 
+----+-------+------+------+ 
나는 그런가 someting 시도 :

dfB.collect.foreach(r => 
    var dfAp = dfA.where(r.getAs("COLUMN") == r.getAs("VALUE")) 
    dfAp.withColumn("PRIO", lit(r.getAs("PRIO"))) 
) 

을하지만 그것이 올바른 방법이 아니다 확신합니다.

Spark에서이 문제를 해결하기위한 전략은 무엇입니까?

답변

3

일련의 규칙이 비교적 작다고 가정하면 (최악의 시나리오 인 데이터의 크기와 생성 된 표현의 크기는 가능한 한 염두에 두어 야합니다. can crash the planner) 가장 간단한 해결책은 로컬 컬렉션 및 맵을 사용하는 것입니다 그것을 SQL의 표현 :

import org.apache.spark.sql.functions.{coalesce, col, lit, when} 

val df = Seq(
    (1, "US", "1"), (2, "FR", "1"), (4, "DE", "1"), 
    (5, "DE", "2"), (3, "DE", "3") 
).toDF("id", "COUNTRY", "MONTH") 

val rules = Seq(
    ("COUNTRY", "US", 5), ("COUNTRY", "FR", 15), ("MONTH", "3", 2) 
).toDF("COLUMN", "VALUE", "PRIO") 


val prio = coalesce(rules.as[(String, String, Int)].collect.map { 
    case (c, v, p) => when(col(c) === v, p) 
} :+ lit(20): _*) 

df.withColumn("PRIO", prio) 
+---+-------+-----+----+ 
| id|COUNTRY|MONTH|PRIO| 
+---+-------+-----+----+ 
| 1|  US| 1| 5| 
| 2|  FR| 1| 15| 
| 4|  DE| 1| 20| 
| 5|  DE| 2| 20| 
| 3|  DE| 3| 2| 
+---+-------+-----+----+ 

당신은 각각 최소 또는 최대 일치하는 값을 적용 least 또는 greatestcoalesce을 대체 할 수 있습니다. 당신이 할 수 규칙의 큰 세트와

:

  • melt data 긴 형식으로 변환 할 수 있습니다. 열 및 값에 의한

    val dfLong = df.melt(Seq("id"), df.columns.tail, "COLUMN", "VALUE") 
    
  • join. (실시 예 min 용) 적절한 집계 기능 id 의해 PRIOR 집계

  • :

    val priorities = dfLong.join(rules, Seq("COLUMN", "VALUE")) 
        .groupBy("id") 
        .agg(min("PRIO").alias("PRIO")) 
    
  • 외부 id 의해 df와 출력 가입.

    df.join(priorities, Seq("id"), "leftouter").na.fill(20) 
    
    +---+-------+-----+----+ 
    | id|COUNTRY|MONTH|PRIO| 
    +---+-------+-----+----+ 
    | 1|  US| 1| 5| 
    | 2|  FR| 1| 15| 
    | 4|  DE| 1| 20| 
    | 5|  DE| 2| 20| 
    | 3|  DE| 3| 2| 
    +---+-------+-----+----+ 
    
0

이다 dataframeB의 규칙을 가정 할 수 제한 내가

val code = udf{(x:String,y:Int)=>if(x=="US") "5" else if (x=="FR") "15" else if (y==3) "2" else "20"} 

df.withColumn("PRIO",code($"COUNTRY",$"MONTH")).show() 
UDF

사용하여 테이블

+---+-------+------+ 
| id|COUNTRY|MONTH| 
+---+-------+------+ 
| 1|  US|  1| 
| 2|  FR|  1| 
| 4|  DE|  1| 
| 5|  DE|  2| 
| 3|  DE|  3| 
+---+-------+------+ 

아래에 대한 dataframe "DF"를 만든

출력

+---+-------+------+----+ 
| id|COUNTRY|MONTH|PRIO| 
+---+-------+------+----+ 
| 1|  US|  1| 5| 
| 2|  FR|  1| 15| 
| 4|  DE|  1| 20| 
| 5|  DE|  2| 20| 
| 3|  DE|  3| 2| 
+---+-------+------+----+