2016-08-21 3 views
0

나는 Golang에서 mapreduce와 같은 메소드를 구현하려고합니다.Golang : 주기적으로 종속 된 채널 닫기

  • 지도 노동자는 매퍼 출력 채널이 다음 하나의 goroutine 읽을 수있는 매퍼 출력 채널

  • 에 매퍼 입력 채널과 출력 일회성 항목을 끌어 다음과 같이 내 디자인입니다. 이 루틴은 이전에 본 키 - 값 쌍의 맵을 유지 관리합니다. 매퍼 출력의 다음 항목에 일치하는 키가 있으면 일치하는 키가있는 새 값과 이전 값을 모두 축소 입력 채널로 보냅니다.

  • reduce-input 파이프 라인은 두 값을 하나의 키 - 값 쌍으로 줄이고 결과를 동일한 맵 출력 채널에 제출합니다.

이것은 매퍼 출력과 축소 입력 사이의 순환 종속성을 초래하며, 이제는 매퍼 출력이 완료되었음을 알리는 (채널을 닫는) 방법을 알지 못합니다.

주기적 종속성을 깨거나 이러한 순환 동작으로 채널을 닫을 때를 아는 가장 좋은 방법은 무엇입니까?

아래 코드는 맵 출력 채널과 서로 대기하는 감소 입력 채널에 교착 상태가 있습니다.

type MapFn func(input int) (int, int) 
type ReduceFn func(a int, b int) int 

type kvPair struct { 
    k int 
    v int 
} 

type reducePair struct { 
    k int 
    v1 int 
    v2 int 
} 

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) { 
    inputMapChan := make(chan int, len(input)) 
    outputMapChan := make(chan *kvPair, len(input)) 
    reduceInputChan := make(chan *reducePair) 
    outputMapMap := make(map[int]int) 
    go func() { 
     for v := range input { 
      inputMapChan <- v 
     } 
     close(inputMapChan) 
    }() 
    for i := 0; i < nMappers; i++ { 
     go func() { 
      for v := range inputMapChan { 
       k, v := mapFn(v) 
       outputMapChan <- &kvPair{k, v} 
      } 
     }() 
    } 
    for i := 0; i < nReducers; i++ { 
     go func() { 
      for v := range reduceInputChan { 
       reduceValue := reduceFn(v.v1, v.v2) 
       outputMapChan <- &kvPair{v.k, reduceValue} 
      } 
     }() 
    } 
    for v := range outputMapChan { 
     key := v.k 
     value := v.v 
     other, ok := outputMapMap[key] 
     if ok { 
      delete(outputMapMap, key) 
      reduceInputChan <- &reducePair{key, value, other} 
     } else { 
      outputMapMap[key] = value 
     } 
    } 
    return outputMapMap, nil 
} 
+0

경우에도 마찬가지입니다. 제 생각에'mapreduce'는'map'과'reduce' 두 단계를 포함해야합니다. –

+0

어려움은 '감소'단계의 재귀 적 특성에서 비롯됩니다. reduce 연산은 더 많은 fold를 필요로 할 수 있으며, 이것은 임의의 횟수로 발생할 수 있습니다. reduce를위한 출력이 reduce를 위해 reduce 입력으로 흘러가도록하는 메커니즘이 필요합니다. –

+0

'reduce '단계에 자연 재귀가 없습니다. 매핑 된 중간 결과는 세트로 분할되어야하며 모든 세트는 감속기별로 처리 될 수 있습니다. 왜 mapreduce가 배포 시스템에서 작동하는지. –

답변

0

이 시도 : 영원히 대기 원인에는`Reducers`의`outputMapChan`이 폐쇄되지 않은, 존재하지

package main 

import "fmt" 
import "sync" 
import "sync/atomic" 
import "runtime" 
import "math/rand" 
import "time" 

type MapFn func(input int) *kvPair 
type ReduceFn func(a int, b int) int 

type kvPair struct { 
    k int 
    v int 
} 

type reducePair struct { 
    k int 
    v1 int 
    v2 int 
} 

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) { 
    inputMapChan := make(chan int, len(input)) 
    outputMapChan := make(chan *kvPair, len(input)) 
    reduceInputChan := make(chan *reducePair) 
    outputMapMap := make(map[int]int) 

    wg := sync.WaitGroup{} 
    wg.Add(1) 
    go func() { 
     defer wg.Done() 
     for _, v := range input { 
      inputMapChan <- v 
     } 
     close(inputMapChan) 
    }() 

    for i := 0; i < nMappers; i++ { 
     wg.Add(1) 
     go func() { 
      defer wg.Done() 
      for v := range inputMapChan { 
       outputMapChan <- mapFn(v) 
      } 
     }() 
    } 

    finished := false 
    go func() { 
     wg.Wait() 
     finished = true 
    }() 

    var count int64 
    for i := 0; i < nReducers; i++ { 
     go func() { 
      for v := range reduceInputChan { 
       reduceValue := reduceFn(v.v1, v.v2) 
       outputMapChan <- &kvPair{v.k, reduceValue} 
       atomic.AddInt64(&count, -1) 
      } 
     }() 
    } 

    wg2 := sync.WaitGroup{} 
    wg2.Add(1) 
    go func() { 
     defer wg2.Done() 
     for { 
      select { 
      default: 
       if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 { 
        return 
       } 
       //runtime.Gosched() 
      case v := <-outputMapChan: 
       key := v.k 
       value := v.v 
       if other, ok := outputMapMap[key]; ok { 
        delete(outputMapMap, key) 
        atomic.AddInt64(&count, 1) 
        reduceInputChan <- &reducePair{key, value, other} 
       } else { 
        outputMapMap[key] = value 
       } 
      } 
     } 
    }() 

    wg2.Wait() 
    return outputMapMap, nil 
} 

func main() { 
    fmt.Println("NumCPU =", runtime.NumCPU()) 
    t := time.Now() 
    a := rand.Perm(1000000) 
    //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2} 
    m, err := MapReduce(mp, rdc, a, 2, 2) 
    if err != nil { 
     panic(err) 
    } 
    fmt.Println(time.Since(t)) //883ms 
    fmt.Println(m) 
    fmt.Println("done.") 
} 

func mp(input int) *kvPair { 
    return &kvPair{input & 7, input >> 3} 
} 
func rdc(a int, b int) int { 
    b <<= 3 
    if a != 0 { 
     b |= a 
    } 
    return b 
} 
+0

@ jack-reilly 나는 이것이 도움이되기를 바랍니다 –

+1

아주 잘 했어! 이러한 블록을 동기화하기 위해 대기 그룹 및 원자 카운터를 사용하는 방법을 보여 주셔서 감사합니다. –