나는 비교적 단순한 Spark 작업을 Scala에서 작성했다.이 작업은 S3에서 일부 데이터를 읽고, 일부 변환과 집계를 수행 한 다음 결과를 저장소에 저장한다.Spark : RDD 요소를 청크로 분할
마지막 단계에서 내 도메인 모델의 RDD가 있으며,이를 저장소의 대량 삽입을 할 수 있도록 요소 묶음으로 그룹화하고 싶습니다.
나는이를 달성하기 위해 RDDFunctions.sliding
방법을 사용했으며 거의 잘 작동합니다. 다음은 코드의 단순화 된 버전입니다.
val processedElements: RDD[DomainModel] = _
RDDFunctions.fromRDD(processedElements)
.sliding(500, 500)
.foreach { elementsChunk =>
Await.ready(repository.bulkInsert(elementsChunk), 1.minute)
}
예를 들어 1020 개의 요소가있는 경우 문제가 1,000 개 요소 만 내 리포지토리에 저장됩니다. 창 크기가 나머지 요소의 양보다 큰 경우 슬라이딩처럼 추가 요소를 무시합니다.
해결 방법이 있습니까? 그렇지 않은 경우 RDDFunctions.sliding
을 사용하지 않고 동일한 동작을 수행 할 수있는 다른 방법이 있습니까?
처음에 이렇게 해보았습니다. 불행히도, 이것은 많은 것을 메모리에 보관하는 결과로 메모리 문제를 일으켰습니다. – Alex