2017-09-08 10 views
0

나는 유사한 RDD,RDD의 각 행에 서로를 곱하는 방법은 무엇입니까?

CELL-ID | COUNT 
-------------- 
abcd  10 
DEF  20 
ghi  15 

내가

CELL-ID-1 | CELL-ID-2 | PRODUCT 
-------------- 
abcd  DEF   200 
abcd  ghi   150 
DEF  abcd   200 
DEF  ghi   300 
... 
.... 

어떻게

가이 작업을 수행 할 수 있습니다와 함께 RDD를 얻을 필요가 있나요? 나는 t1t2튜플을 (전체 "기록")를 나타냅니다 직교 제품을 사용하는 연결했지만 당신은 할 수 있습니다 출력

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case ((t1: _,Int), (t2: _,Int)) if t1 != t2 => t1 * t2 
} 
+0

'originalRDD'가 큰 데이터 세트 인 경우'.collect()'는 드라이버 프로그램의 메모리에 맞지 않습니다. 당신은 왜 당신이 산출물을 얻을 수 없었는지 언급하지 않습니다, 그래서 우리는 단지 추측 할 수 있습니다. –

+0

꽤 큰 RDD, 6M + 레코드입니다.이 코드를 '지도'또는 다른 것으로 이식해야합니다. – Infamous

+0

사실, 저는 실수였습니다 ... @ tzach-zohar의 답변에 대한 내 의견을 참조하십시오. –

답변

4

를 가져올 수 없습니다 :

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case (t1: (String ,Int), t2: (String ,Int)) if t1 != t2 => (t1._1, t2._1, t1._2 * t2._2) 
} 

또는를, 당신은 동일한 작업을 수행하지만, 더 그들을 파괴하기 위해 패턴 매칭을 사용할 수 있습니다

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case ([email protected](s1 ,i1), [email protected](s2, i2)) if t1 != t2 => (s1, s2, i1 * i2) 
} 

귀하의 솔루션 모습을 한 번에 두 가지 작업을 시도하는 것과 같습니다 ...

+0

'.collect()'대신에'.map()'을 추천합니다. '.collect()'호출은 필터링 된 데카르트 결과를 드라이버 프로그램의 힙으로 되돌려줍니다. 이 장난감 데이터 세트에는 문제가 없지만 큰 분산 데이터 세트를 사용하면 결과가 상당히 커집니다. –

+1

@TravisHegner는 사실이 아닙니다. 이름이 혼란 스러울 지 모르지만,이 특정 오버로드 인'RDD.collect'는 데이터를 드라이버로 보내지 않습니다. 부분적인 기능을 적용하여 단순한 매핑 및 필터링입니다. 사실 구현은'filter (f.isDefinedAt) .map (f)'처럼 간단합니다. https://github.com/apache/spark/blob/cd0a08361e2526519e7c131c42116bf56fa62c76/core/src/main/scala/org/apache를 참조하십시오. /spark/rdd/RDD.scala#L958-L961 –

+0

내 실수. 설명에 감사드립니다. –