2017-03-27 14 views
0

이것은 나의 예입니다.spark에서 파티션을 효율적으로 배포하고 사용하는 방법은 무엇입니까?

val arr = Array((1,2), (1,3), (1,4), (2,3), (4,5)) 
val data = sc.parallelize(arr, 5) 

data.glom.map(_length).collect 
Array[Int] = Array(1, 1, 1, 1, 1) 

val agg = data.reduceByKey(_+_) 
agg.glom.map(_.length).collect 
Array[Int] = Array(0, 1, 1, 0, 1) 

val fil = agg.filter(_._2 < 4) 
fil.glom.map(_.length).collect 
Array[Int] = Array(0, 0, 1, 0, 0) 

val sub = data.map{case(x,y) => (x, (x,y))}.subtractByKey(fil).map(_._2) 
Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (4,5)) 

sub.glom.map(_.length).collect 
Array[Int] = Array(0, 3, 0, 0, 1) 

궁금한 점은 파티션을 균등하게 배포하는 것입니다.

data 변수는 5 개의 파티션으로 구성되며 모든 데이터는 균등하게 분할됩니다. 여러 transformation operation

ex)par1: (1,2) 
    par2: (1,3) 
    par3: (1,4) 
    par4: (2,3) 
    par5: (4,5) 

sub 변수에 할당 된 다섯 개 파티션 만이 사용된다.

변수는 5 개의 파티션으로 구성되지만 모든 데이터가 균등하게 분할되지는 않습니다. 나는 sub 변수에 다른 transformation operation를 추가하면

ex)par1: empty 
    par2: (1,2),(1,3),(1,4) 
    par3: empty 
    par4: empty 
    par5: (4,5) 

는 5 개 사용 가능한 파티션이있을 것이다,하지만이 파티션은 작업에 사용됩니다.

ex)sub.map{case(x,y) => (x, x, (x,y))} 

그래서 데이터를 조작 할 때 사용 가능한 모든 파티션을 사용하고 싶습니다.

나는 repartition 방법을 사용했지만 비용은 저렴하지 않습니다.

ex) sub.repartition(5).glom.map(_.length).collect 
Array[Int] = Array(0, 1, 1, 2, 0) 

가능한 한 많은 파티션을 활용하는 현명한 방법을 찾고 있습니다.

좋은 방법이 있습니까? 하지 5 행 -

답변

1

그래서 repartition 확실히

귀하의 예는 스파크는 수십억 개의 행을 처리하기 위해 구축 그대로 아무것도 보여주기 위해 너무 간단 :) 갈 수있는 방법입니다. repartition은 정확히 을 각 파티션에 동일한 수의 행을 넣지 않지만 균등하게 데이터를 분배합니다. 대신 1.000.000 행을 사용하여 예제를 다시 실행하면 repartition 다음에 데이터가 실제로 균등하게 분산됩니다.

많은 양의 데이터 변환 작업을 할 때 데이터 왜곡은 종종 큰 문제가되며 데이터를 다시 분할하면 데이터를 임의로 셔플해야하기 때문에 추가 시간이 소요됩니다. 다음과 같은 변환 단계를 더 빨리 수행 할 수 있기 때문에 때때로 벌칙을 적용하는 것이 좋습니다.

+0

답장을 보내 주셔서 감사합니다. –