2017-12-21 33 views
1

Iterable [MyObject] (groupBy보다 RDD [MyObject])에 포함 된 일부 데이터를 "추출해야"합니다. 범위 별 스칼라 RDD 개수

내 초기 RDD [MyObject를] :

내가하고 GROUPBY startCity으로 연령 범위별로 계산해야
|-----------|---------|----------| 
| startCity | endCity | Customer | 
|-----------|---------|----------| 
| Paris  | London | ID | Age | 
|   |   |----|-----| 
|   |   | 1 | 1 | 
|   |   |----|-----| 
|   |   | 2 | 1 | 
|   |   |----|-----| 
|   |   | 3 | 50 | 
|-----------|---------|----------| 
| Paris  | London | ID | Age | 
|   |   |----|-----| 
|   |   | 5 | 40 | 
|   |   |----|-----| 
|   |   | 6 | 41 | 
|   |   |----|-----| 
|   |   | 7 | 2 | 
|-----------|---------|----|-----| 
| New-York | Paris | ID | Age | 
|   |   |----|-----| 
|   |   | 9 | 15 | 
|   |   |----|-----| 
|   |   | 10| 16 | 
|   |   |----|-----| 
|   |   | 11| 46 | 
|-----------|---------|----|-----| 
| New-York | Paris | ID | Age | 
|   |   |----|-----| 
|   |   | 13| 7 | 
|   |   |----|-----| 
|   |   | 14| 9 | 
|   |   |----|-----| 
|   |   | 15| 60 | 
|-----------|---------|----|-----| 
| Barcelona | London | ID | Age | 
|   |   |----|-----| 
|   |   | 17| 66 | 
|   |   |----|-----| 
|   |   | 18| 53 | 
|   |   |----|-----| 
|   |   | 19| 11 | 
|-----------|---------|----|-----| 

- endCity에게

는 최종 결과는 다음과 같아야합니다

|-----------|---------|-------------| 
| startCity | endCity | Customer | 
|-----------|---------|-------------| 
| Paris  | London | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 3 | 
|   |   |------|------| 
|   |   |3-18 | 0 | 
|   |   |------|------| 
|   |   |19-99 | 3 | 
|-----------|---------|-------------| 
| New-York | Paris | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 0 | 
|   |   |------|------| 
|   |   |3-18 | 3 | 
|   |   |------|------| 
|   |   |19-99 | 2 | 
|-----------|---------|-------------| 
| Barcelona | London | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 0 | 
|   |   |------|------| 
|   |   |3-18 | 1 | 
|   |   |------|------| 
|   |   |19-99 | 2 | 
|-----------|---------|-------------| 

에서 순간 나는 같은 데이터 (처음에는 0-2 범위, 그 다음은 10-20, 그리고 21-99)를 3 번 ​​계산합니다.

처럼 :

Iterable[MyObject] ite 

ite.count(x => x.age match { 
    case Some(age) => { age >= 0 && age < 2 } 
} 

그것은 나에게 정수를 제공하여 일하고 있지만 전혀 나는 많은 시간을 계산해야하기 때문에 내가 생각 효율이 제발 할 수있는 가장 좋은 방법은 무엇입니까?

감사

편집 : 고객의 객체 인의 RDD와 그런 경우 클래스

+0

은 'Customer'가 배열입니까? –

+0

고객이 객체입니다 – Drakax

+0

우리가 당신을 도울 수 있도록 객체 유형을 공유 할 수 있습니까? 그것은 사건 계급인가요? –

답변

2
def computeRange(age : Int) = 
    if(age<=2) 
     "0-2" 
    else if(age<=10) 
     "2-10" 
    // etc, you get the idea 

, case class MyObject(id : String, age : Int)

rdd 
    .map(x=> computeRange(x.age) -> 1) 
    .reduceByKey(_+_) 

편집 : 을 수행해야하는 경우 일부 열에 따라 그룹을 만들면 이렇게 할 수 있습니다. 당신은 RDD [(SomeColumns, Iterable [MyObject])]를 가지고 있습니다. 다음 줄은 각 "범위"를 해당 발생 횟수와 연결시키는 맵을 제공합니다.

def computeMapOfOccurances(list : Iterable[MyObject]) : Map[String, Int] = 
    list 
     .map(_.age) 
     .map(computeRange) 
     .groupBy(x=>x) 
     .mapValues(_.size) 

val result1 = rdd 
    .mapValues(computeMapOfOccurances(_)) 

그리고 당신은 당신의 데이터를 평평하게해야 할 경우, 당신은 쓸 수 있습니다 :

val result2 = result1 
    .flatMapValues(_.toSeq)  
+0

감사합니다. RDD [MyObject]의 매력처럼 작동하지만 내 초기 RDD [MyObject]와 reduceByKey를 사용할 수 없기 때문에 Iterable [MyObject]가 있습니다. Iterable. 나의 초기 질문은 지금 내 마음 속에 분명하지 않았기 때문에 편집되었다. 미안. – Drakax

+0

나는 나의 대답도 편집했다. 희망이 도움이 – Oli

+0

고마워! – Drakax

1

당신이 Customer[Object]는이 case class

case class Customer(ID: Int, Age: Int) 

그리고 당신의 RDD[MyObject] 아래로 rdd 그대로 있다고 가정하면 case class 다음과 같이

case class MyObject(startCity: String, endCity: String, customer: List[Customer]) 
그래서 당신은

MyObject(Paris,London,List(Customer(1,1), Customer(2,1), Customer(3,50))) 
MyObject(Paris,London,List(Customer(5,40), Customer(6,41), Customer(7,2))) 
MyObject(New-York,Paris,List(Customer(9,15), Customer(10,16), Customer(11,46))) 
MyObject(New-York,Paris,List(Customer(13,7), Customer(14,9), Customer(15,60))) 
MyObject(Barcelona,London,List(Customer(17,66), Customer(18,53), Customer(19,11))) 

아래와 같이 입력을 (당신이 테이블 형식으로 가지고)를 갖는해야 그리고 당신은 또한 당신이 Iterable[MyObject]을 그룹화 한 후 단계 아래에 해당하는 언급 한 case class ES 이상 사용

val groupedRDD = rdd.groupBy(myobject => (myobject.startCity, myobject.endCity)) //groupedRDD: org.apache.spark.rdd.RDD[((String, String), Iterable[MyObject])] = ShuffledRDD[2] at groupBy at worksheetTest.sc:23 

그래서 당신이해야 할 다음 단계는 Iterable[MyObject]을 반복하는 mapValues를 사용하고 각 범위에 속하는 age의 계산,하면 마지막입니다 Y는 updateCounts재귀 함수를

def updateCounts(ageList: List[Int], map: Map[String, Int]) : Map[String, Int] = ageList match{ 
    case head :: tail => if(head >= 0 && head < 3) { 
    updateCounts(tail, map ++ Map("0-2" -> (map("0-2")+1))) 
    } else if(head >= 3 && head < 19) { 
    updateCounts(tail, map ++ Map("3-18" -> (map("3-18")+1))) 
    } else updateCounts(tail, map ++ Map("19-99" -> (map("19-99")+1))) 
    case Nil => map 
} 

CustomerOut 당신이

val finalResult = groupedRDD.mapValues(x => { 
    val rangeAge = Map("0-2" -> 0, "3-18" -> 0, "19-99" -> 0) 
    val list = x.flatMap(y => y.customer.map(z => z.Age)).toList 
    updateCounts(list, rangeAge).map(x => CustomerOut(x._1, x._2)).toList 
}) 

아래와 같이 요구 출력으로 변환하는 다른 case class

case class CustomerOut(Range: String, Count: Int) 

이다 그래서 finalResult

아래와 같다
((Barcelona,London),List(CustomerOut(0-2,0), CustomerOut(3-18,1), CustomerOut(19-99,2))) 
((New-York,Paris),List(CustomerOut(0-2,0), CustomerOut(3-18,4), CustomerOut(19-99,2))) 
((Paris,London),List(CustomerOut(0-2,3), CustomerOut(3-18,0), CustomerOut(19-99,3)))