2017-05-09 4 views
0

나는 비교적 단순한 Spark 작업을 Scala에서 작성했다.이 작업은 S3에서 일부 데이터를 읽고, 일부 변환과 집계를 수행 한 다음 결과를 저장소에 저장한다.Spark : RDD 요소를 청크로 분할

마지막 단계에서 내 도메인 모델의 RDD가 있으며,이를 저장소의 대량 삽입을 할 수 있도록 요소 묶음으로 그룹화하고 싶습니다.

나는이를 달성하기 위해 RDDFunctions.sliding 방법을 사용했으며 거의 ​​잘 작동합니다. 다음은 코드의 단순화 된 버전입니다.

val processedElements: RDD[DomainModel] = _ 
RDDFunctions.fromRDD(processedElements) 
    .sliding(500, 500) 
    .foreach { elementsChunk => 
     Await.ready(repository.bulkInsert(elementsChunk), 1.minute) 
    } 

예를 들어 1020 개의 요소가있는 경우 문제가 1,000 개 요소 만 내 리포지토리에 저장됩니다. 창 크기가 나머지 요소의 양보다 큰 경우 슬라이딩처럼 추가 요소를 무시합니다.

해결 방법이 있습니까? 그렇지 않은 경우 RDDFunctions.sliding을 사용하지 않고 동일한 동작을 수행 할 수있는 다른 방법이 있습니까?

답변

0

foreachPartition과 수동 배치 관리를 사용할 수 없습니까?

fromRDD.foreachPartition(items: Iterator[DomainModel] => { 
    val batch = new ArrayBuffer[DomainModel](BATCH_SIZE) 
    while (items.hasNext) { 
    if (batch.size >= BATCH_SIZE) { 
     bulkInsert(batch) 
     batch.clear() 
    } 
    batch += items.next 
    } 
    if (!batch.isEmpty) { 
     bulkInsert(batch) 
    } 
}) 
+0

처음에 이렇게 해보았습니다. 불행히도, 이것은 많은 것을 메모리에 보관하는 결과로 메모리 문제를 일으켰습니다. – Alex

0

당신이있어 바로 그 불꽃의 sliding 창 크기가 RDDFunctions doc에 따라, 나머지 항목의 수를 초과하는 경우 (스칼라는 달리), 빈 RDD을 생성합니다. Spark도 Scala의 grouped과 동등하지 않습니다.

만들 그룹 수를 알고있는 경우 적용 가능한 해결 방법은 modulo 필터를 사용하여 RDD를 분할하는 것입니다. 다음은 RDD를 5 개의 그룹으로 나누는 간단한 예입니다.

val rdd = sc.parallelize(Seq(
    (0, "text0"), (1, "text1"), (2, "text2"), (3, "text2"), (4, "text2"), (5, "text5"), 
    (6, "text6"), (7, "text7"), (8, "text8"), (9, "text9"), (10, "text10"), (11, "text11") 
)) 

def g(n:Int)(x: Int): Boolean = { x % 5 == n } 

val rddList = (0 to 4).map(n => rdd.filter(x => g(n)(x._1))) 

(0 to 4).foreach(n => rddList(n).collect.foreach(println)) 

(0,text0) 
(5,text5) 
(10,text10) 

(1,text1) 
(6,text6) 
(11,text11) 

(2,text2) 
(7,text7) 

(3,text2) 
(8,text8) 

(4,text2) 
(9,text9) 
+0

불행히도 얼마나 많은 그룹이 있을지 모릅니다. 입력이 끊임없이 변하기 때문에 우리는 다른 수의 그룹으로 끝날 수 있습니다. – Alex