2016-06-22 5 views
0

나는 Spark 1.5.0 코드를 가지고 있습니다.스파크 데이터 프레임 reducebykey (비 고유 키 값 사용) 및 사용자 정의 값 연산

경우 클래스 myCaseClass (USER_ID : 문자열, 설명 : 문자열) 여기

내 UDF입니다

다음
val getConcatenated = udf((first: String, second: String, third: String) => { first + " " + second + " " + third}) 

내가 지금 내 dataframe

val df_description = df.withColumn("description",getConcatenated(col("text1"), col("text2"), col("weight"))).select("user_id","description") 

를 생성하는 경우, 내가 원하는 견인 열이있는이 DF에 대해 redueByKey 작업을 수행하십시오 (둘 다 문자열 임). 내 user_ids가 고유하지 않아 주어진 user_id에 대한 모든 값/설명 항목을 연결하려고합니다.

어떻게하면됩니까?

I는 다음과 같이 수행 할 수 있습니다

val description_rdd = df_description.map(row => myCaseClass(row.getString(0), row.getString(1))) 

을하지만 어떻게 내가 여기 pairedrdd을 생성합니까? 그런 다음 rdd에서 CreateDataFrame 메서드를 사용하여 데이터 프레임으로 돌아가고 싶습니다.

답변

0

아래의 코드는 키 열이 DF를 생성하고 열은 설명의 순서 유지 :

import org.apache.spark.rdd.PairRDDFunctions 

val pairRDD : PairRDDFunctions[String, String] = df_description.rdd.map(row => (row.getString(0), row.getString(1))) 
val groupedRDD = pairRDD.groupByKey().map(p => (p._1, p._2.toSeq)) 
val groupedDF = groupedRDD.toDF()