2017-03-25 21 views
1

문제좋은 방법은 스칼라를 이용하여 스캔 데이터를 상기 입력 데이터가 레코드의 2 개 종류가 그들과 <code>R</code><code>W</code> 호출 할

촉발.

현재 레코드의 유형이 W 인 경우이 데이터를 위에서 아래로 트래버스해야합니다. 맵과 병합해야합니다 (workMap). 해당 W 유형 레코드의 키가 이미 맵에 있으면이 레코드의 값이 추가되고 그렇지 않으면 workMap으로 새 항목이 작성됩니다.

현재 레코드의 형식이 R 인 경우이 레코드까지 계산 된 workMap이 현재 레코드에 연결됩니다. 이 기록의 순서는 예를 들어

-

W1, W2, W3, W4 및 W5의 유형 W있다
W1- a -> 2 
W2- b -> 3 
W3- a -> 4 
R1 
W4- c -> 1 
R2 
W5- c -> 4 

; 그리고 R1 및 R2는이 함수의 끝에서 입력 R

으로하며, I가 있어야 다음 - I 그 시점까지 산출 된 중간 workMap들에 연결된 모든 R 형 기록 할

R1 - { a -> 6, 
     b -> 3 } //merged(W1, W2, W3) 
R2 - { a -> 6, 
     b -> 3, 
     c -> 1 } //merged(W1, W2, W3, W4) 
{ a -> 6, 
    b -> 3, 
    c -> 5 } //merged(W1, W2, W3, W4, W5) 

; 그리고 마지막 레코드 이후의 마지막 workMap이 처리됩니다.

case class ReportKey (
         // the type of record - report or work 
         rType: Int, 
         date: String, 
         ..... 
         ) 

내가 도움을 요청하고이 방법 두 가지 문제가 있습니다

-

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]): 
    Iterator[(ReportKey, ReportVal)] = { 

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal] 
    val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)] 

    while (itr.hasNext) { 
     val temp = itr.next() 
     val (iKey, iVal) = (temp._1, temp._2) 

     if (iKey.recordType == reportType) { 
     //creates a new (ReportKey, Reportval) 
     reportList += getNewReportRecord(workMap, iKey, iVal) 
     } 
     else { 
     //if iKey is already present, merge the values 
     //other wise adds a new entry 
     updateWorkMap(workMap, iKey, iVal) 
     } 
    } 
    val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport) 

    reportList.iterator ++ workList.iterator 
    } 

ReportKey 클래스는 다음과 같이이다 -

여기

내가 작성한 코드입니다 -

  1. 나는 가지고있다. o reportList - workMap과 연결된 R 유형 레코드 목록을 보관합니다. 데이터가 커짐에 따라 reportList도 커지고 나는 OutOfMemoryException으로 실행 중입니다.
  2. 동일한 데이터 구조에 reportListworkMap 개의 레코드를 결합한 다음 반환해야합니다. 다른 우아한 방법이 있다면이 디자인을 변경하는 것이 좋습니다.

- 나는 불꽃을 사용하고 있습니다. 함수 calcPerPartition은 RDD의 mapPartitions에 대한 인수로 전달됩니다. 나중에 각 파티션에서 workMap을 추가로 계산해야합니다.

... 
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]      
itr.scanLeft[Option[(ReportKey, Reportval)]](
    None)((acc: Option[(ReportKey, Reportval)], 
    curr: (InputKey, InputVal)) => { 

    if (curr._1.recordType == reportType) { 
    val rec = getNewReportRecord(workMap, curr._1, curr._2) 
    Some(rec) 
    } 
    else { 
    updateWorkMap(workMap, curr._1, curr._2) 
    None 
    } 
}) 

val reportList = scan.filter(_.isDefined).map(_.get) 
//workMap is still empty after the scanLeft. 
... 

물론, 난에 입력 데이터에 reduce 작업을 수행 할 수 있습니다 -

은 내가 각 파티션에서 workMap의를 반환하지 않는 경우, 문제는이 같은 훨씬 쉬워집니다 것을 알고있다 최종 workMap을 얻지 만 데이터를 두 번 봐야합니다.입력 데이터 세트가 거대하다는 것을 고려해 볼 때, 나는 그것을 피하고 싶다.

하지만 불행히도 나는 후자의 단계에서 workMap이 필요합니다.

위의 문제를 해결하는 더 좋은 방법이 있습니까? 문제 2를 전혀 해결할 수 없다면 (according to this) R 레코드 (reportList)를 목록에 저장하거나 데이터를 두 번 이상 스캔하지 않도록 할 수있는 다른 방법이 있습니까?

답변

0

reportListworkMap을 단일 데이터 구조에 결합하는 것을 피할 수 있다면 더 나은 디자인을 아직 갖고 있지 않지만 목록에 R 레코드를 저장하는 것을 확실히 피할 수 있습니다. 여기

우리가 위의 질문에서 calcPerPartition을 다시 쓸 수있는 방법입니다 -

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]): 
    Iterator[Option[(ReportKey, ReportVal)]] = { 

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal] 
    var finalWorkMap = true 

    new Iterator[Option[(ReportKey, ReportVal)]](){ 
     override def hasNext: Boolean = itr.hasNext 

     override def next(): Option[(ReportKey, ReportVal)] = { 
      val curr = itr.next() 
      val iKey = curr._1 
      val iVal = curr._2 
      val eventKey = EventKey(openKey.date, openKey.symbol) 

      if (iKey.recordType == reportType) { 
       Some(getNewReportRecord(workMap, iKey, iVal)) 
      } 
      else { 
       //otherwise update the generic interest map but don't accumulate anything 
       updateWorkMap(workMap, iKey, iVal) 
       if (itr.hasNext) { 
       next() 
       } 
       else { 
        if(finalWorkMap){ 
        finalWorkMap = false //because we want a final only once 
        Some(workMap.map(convertToReport)) 
        } 
        else { 
        None 
        } 

       } 
      } 
     } 
    } 
    } 

을 대신 목록에 결과를 저장, 우리는 반복자를 정의했다. 이것으로 우리는이 문제와 관련된 대부분의 메모리 문제를 해결했습니다.