문제좋은 방법은 스칼라를 이용하여 스캔 데이터를 상기 입력 데이터가 레코드의 2 개 종류가 그들과 <code>R</code><code>W</code> 호출 할
촉발.
현재 레코드의 유형이 W
인 경우이 데이터를 위에서 아래로 트래버스해야합니다. 맵과 병합해야합니다 (workMap
). 해당 W 유형 레코드의 키가 이미 맵에 있으면이 레코드의 값이 추가되고 그렇지 않으면 workMap
으로 새 항목이 작성됩니다.
현재 레코드의 형식이 R
인 경우이 레코드까지 계산 된 workMap
이 현재 레코드에 연결됩니다. 이 기록의 순서는 예를 들어
W
있다
W1- a -> 2
W2- b -> 3
W3- a -> 4
R1
W4- c -> 1
R2
W5- c -> 4
; 그리고 R1 및 R2는이 함수의 끝에서 입력 R
으로하며, I가 있어야 다음 - I 그 시점까지 산출 된 중간 workMap
들에 연결된 모든 R 형 기록 할
R1 - { a -> 6,
b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6,
b -> 3,
c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6,
b -> 3,
c -> 5 } //merged(W1, W2, W3, W4, W5)
; 그리고 마지막 레코드 이후의 마지막 workMap
이 처리됩니다.
case class ReportKey (
// the type of record - report or work
rType: Int,
date: String,
.....
)
내가 도움을 요청하고이 방법 두 가지 문제가 있습니다
-
def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
Iterator[(ReportKey, ReportVal)] = {
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]
while (itr.hasNext) {
val temp = itr.next()
val (iKey, iVal) = (temp._1, temp._2)
if (iKey.recordType == reportType) {
//creates a new (ReportKey, Reportval)
reportList += getNewReportRecord(workMap, iKey, iVal)
}
else {
//if iKey is already present, merge the values
//other wise adds a new entry
updateWorkMap(workMap, iKey, iVal)
}
}
val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)
reportList.iterator ++ workList.iterator
}
ReportKey
클래스는 다음과 같이이다 -
내가 작성한 코드입니다 -
- 나는 가지고있다. o
reportList
-workMap
과 연결된R
유형 레코드 목록을 보관합니다. 데이터가 커짐에 따라reportList
도 커지고 나는OutOfMemoryException
으로 실행 중입니다. - 동일한 데이터 구조에
reportList
과workMap
개의 레코드를 결합한 다음 반환해야합니다. 다른 우아한 방법이 있다면이 디자인을 변경하는 것이 좋습니다.
- 나는 불꽃을 사용하고 있습니다. 함수 calcPerPartition
은 RDD의 mapPartitions에 대한 인수로 전달됩니다. 나중에 각 파티션에서 workMap
을 추가로 계산해야합니다.
...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
itr.scanLeft[Option[(ReportKey, Reportval)]](
None)((acc: Option[(ReportKey, Reportval)],
curr: (InputKey, InputVal)) => {
if (curr._1.recordType == reportType) {
val rec = getNewReportRecord(workMap, curr._1, curr._2)
Some(rec)
}
else {
updateWorkMap(workMap, curr._1, curr._2)
None
}
})
val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft.
...
물론, 난에 입력 데이터에 reduce
작업을 수행 할 수 있습니다 -
은 내가 각 파티션에서 workMap
의를 반환하지 않는 경우, 문제는이 같은 훨씬 쉬워집니다 것을 알고있다 최종 workMap
을 얻지 만 데이터를 두 번 봐야합니다.입력 데이터 세트가 거대하다는 것을 고려해 볼 때, 나는 그것을 피하고 싶다.
하지만 불행히도 나는 후자의 단계에서 workMap
이 필요합니다.
위의 문제를 해결하는 더 좋은 방법이 있습니까? 문제 2를 전혀 해결할 수 없다면 (according to this) R
레코드 (reportList
)를 목록에 저장하거나 데이터를 두 번 이상 스캔하지 않도록 할 수있는 다른 방법이 있습니까?