2017-12-07 4 views
1

일부 goroutines에 일부 부하를 분산하고 싶습니다. 사전에 작업의 수를 알고 있다면 쉽게 구성 할 수 있습니다. 예를 들어 대기 그룹과 함께 팬을 만날 수 있습니다.golang의 재귀 동시성

nTasks := 100 
nGoroutines := 10 

// it is important that this channel is not buffered 
ch := make(chan *Task) 
done := make(chan bool) 
var w sync.WaitGroup 
// Feed the channel until done 
go func() { 
    for i:= 0; i < nTasks; i++ { 
     task := getTaskI(i) 
     ch <- task 
    } 
    // as ch is not buffered once everything is read we know we have delivered all of them 
    for i:=0; i < nGoroutines; i++ { 
     done <- false 
    } 
}() 
for i:= 0; i < nGoroutines; i ++ { 
    w.Add(1) 
    go func() { 
     defer w.Done() 
     select { 
     case task := <-ch: 
      doSomethingWithTask(task) 
     case <- done: 
      return 
     } 
    }() 
} 
w.Wait() 
// All tasks done, all goroutines closed 

그러나 제 경우에는 각 작업이 더 많은 작업을 반환합니다. 예를 들어 크롤링 된 웹에서 모든 링크를받는 크롤러라고 말하십시오. 필자의 첫 번째 직감은 수행 된 작업 수와 보류중인 작업 수를 추적하는 메인 루프를 만드는 것이 었습니다. 완료되면 나는 모든 신호에 완료 신호를 보낸다 :

nGoroutines := 10 
ch := make(chan *Task, nGoroutines) 
feedBackChannel := make(chan * Task, nGoroutines) 
done := make(chan bool) 

for i:= 0; i < nGoroutines; i ++ { 
    go func() { 
     select { 
     case task := <-ch: 
      task.NextTasks = doSomethingWithTask(task) 
      feedBackChannel <- task 
     case <- done: 
      return 
     } 
    }() 
} 

// seed first task 
ch <- firstTask 
nTasksRemaining := 1 

for nTasksRemaining > 0 { 
    task := <- feedBackChannel 
    nTasksRemaining -= 1 
    for _, t := range(task.NextTasks) { 
     ch <- t 
     nTasksRemaining++ 
    } 
} 
for i:=0; i < nGoroutines; i++ { 
    done <- false 
} 

그러나 이것은 교착 상태를 만든다. 예를 들어 NextTasks가 goroutines의 수보다 크면 첫 번째 작업이 완료되면 주 루프가 멈 춥니 다. 그러나 mainLoop이 쓰기를 기다리고 있기 때문에 피드 백이 차단되어 첫 번째 작업을 완료 할 수 없습니다.

비공식적으로 채널에 게시하는 것이 쉬운 방법 중 하나입니다. feedBackChannel <- task을 수행하는 대신 go func() {feedBackChannel <- task}()을 수행하십시오. 자, 이것은 끔찍한 해킹처럼 느껴집니다. 특별히 수십만 개의 작업이있을 수 있기 때문에 특별히 그렇습니다.

이 교착 상태를 피하는 좋은 방법은 무엇입니까? 나는 동시성 패턴을 찾았지만, 대부분은 나중에 패닝 (fanning out)이나 파이프 라인 (pipeline)과 같은 더 단순한 것들이다.

+1

설명이 너무 복잡하기는하지만 완전히 이해할 수있는 메모가 2 개 있습니다. 1. goroutine 내부에서 waitGroup.Add()를 잘못 수행 한 경우,이를 호출하기 전에 완료해야합니다. goroutine이 시작되면 즉시 waitGroup.Done()을 호출합니다. 2. 왜 피드백 채널이 필요한지 명확하지 않습니다. 나에게 필요에 따라 새로운 grorutines을 생성하고 메인 스레드에서 watGroup.Wait()을 수행하면된다. 하지만 몇 가지 요구 사항이 누락되었을 수 있습니다. –

+0

@AlexanderTrakhimenok 새로운 goroutines을 만들면 효과가 있지만 goroutines를 재사용하고 그 양을 제한하면 리소스를 덜 소비하게 될 것입니다. 맞습니까? 제 경우에는 수십만 건의 작업을 기대하고 있습니다. (BTW, 나는 기다림 그룹을 고정시켰다.) –

+0

Go 루틴은 매우 가볍다. 문제없이 수백만 개의 골 루틴을 생성하는 것이 일반적이다. 속도 제한 패턴을 살펴볼 수도 있지만 https://gobyexample.com/rate-limiting –

답변

0

문제를 올바르게 이해하면 솔루션이 매우 복잡합니다. 다음은 몇 가지 사항입니다. 희망이 도움이됩니다.

  • 의견에 언급 된 사람들이 goroutine을 실행하는 것은 저렴합니다 (메모리와 스위치가 모두 OS 레벨보다 저렴합니다). 사용자는 10 만 개를 가질 수 있습니다. 당신이 일하는 goroutines을 갖고 싶어하는 이유를 가정 해 봅시다.
  • 완료 채널 대신 ch 채널을 닫고 select 대신 채널을 통해 range 채널을 수신 할 수 있습니다.
  • chfeedBackChannel을 분리 할 시점이 없으므로 보유한 모든 작업을 ch으로 밀어 넣고 용량을 늘리십시오.
  • 새 작업을 대기열에 추가 할 때 데드락이 발생할 수 있습니다. 내 솔루션은 꽤 순진합니다. 오버플로가 발생하지 않을 때까지 용량을 늘리십시오 (cap(ch) - len(ch) < threshold 일 경우 경고를 기록 할 수도 있음). 1 백만 개의 용량을 가진 포인터 (포인터)를 만들면 숫양이 약 8 * 1e6 ~= 8MB 걸립니다.
+0

나는 생각했다. 얼마나 많은 작업이 남아 있는지 추적하는 방법의 feedBackChannel. 나는 어떤 시점에서 작업이 부족하면 goroutines가 닫히기 시작한다는 것을 의미하기 때문에 기본값을 사용하여 select를 수행 할 수는 없습니다. 그래서 한 가지 작업으로 만 집중화하려고했습니다. 채널을 많이 버퍼링하면 실제로 실제로 구현할 수 있습니다. 채널의 메모리 사용 공간은 정말로 작습니다. 내 특별한 경우에 내가하고 싶은 작업은 IO가 아니라 CPU입니다 (일부 인덱싱을 병렬 처리하기 때문에). goroutines를 열면 항상 영향을 미칩니다. –

+0

@GabrielFurstenheim 문제가 발생했는지 여부를 모르는 경우,'ch'에 모든 채널을 병합하면'len (ch)'를 사용하여 작업 수를 얻을 수 있습니다. 나중에 시간이 있다면 코드 샘플을 추가하려고합니다. –