-2

데이터 세트가 test1.txt입니다. 다음과 같은 데이터가 포함되어 있습니다.Scala를 사용하여 Spark에서 데이터를 집계하는 방법은 무엇입니까?

2::1::3 
1::1::2 
1::2::2 
2::1::5 
2::1::4 
3::1::2 
3::1::1 
3::2::2 

다음 코드를 사용하여 데이터 프레임을 만들었습니다.

case class Test(userId: Int, movieId: Int, rating: Float) 
def pRating(str: String): Rating = { 
val fields = str.split("::") 
assert(fields.size == 3) 
Test(fields(0).toInt, fields(1).toInt, fields(2).toFloat) 
} 

val ratings = spark.read.textFile("C:/Users/test/Desktop/test1.txt").map(pRating).toDF() 
2,1,3 
1,1,2 
1,2,2 
2,1,5 
2,1,4 
3,1,2 
3,1,1 
3,2,2 

하지만 아래와 같은 결과를 출력하고 싶습니다. 중복 된 조합을 제거하고 field(2) value sum of values1,1, 2.0 대신.

1,1,2.0 
1,2,2.0 
2,1,12.0 
3,1,3.0 
3,2,2.0 

제발 어떻게 도와 드릴까요?

+1

dataframe.groupBy ("column1", "column2"). sum ("column3")이 작동해야합니다. – Fabich

+0

덕분에 작동 –

답변

0
ratings.groupBy("userId","movieId").sum(rating) 
+0

이 코드는 중복 행을 제거하기위한 요구 사항을 충족하지 않습니다. 'groupBy' 전에'distinct'가 필요합니다. – Sim

3

중복을 제거하려면 df.distinct을 사용하십시오. 먼저 groupBy을 집계하고 agg을 집계하십시오. 모두 함께이 퍼팅 :

case class Rating(userId: Int, movieId: Int, rating: Float) 

def pRating(str: String): Rating = { 
    val fields = str.split("::") 
    assert(fields.size == 3) 
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat) 
} 

val ratings = spark.read.textFile("C:/Users/test/Desktop/test1.txt").map(pRating) 
val totals = ratings.distinct 
    .groupBy('userId, 'movieId) 
    .agg(sum('rating).as("rating")) 
    .as[Rating] 

난 당신이 Dataset[Rating]로 최종 결과를 싶어 확실하지 오전과 distinctsum 논리는 당신이 문제의 예로 싶어 정확히인지하는 것은 매우 명확하지 않다 그러나 바라건대, 이것은 당신에게 필요한 것을 줄 것입니다.