2016-06-05 4 views
2

간단한 이벤트 기반 작업자를 구현하기 위해 golang을 사용하고 있습니다. 내 근로자가 일부 필요하기 때문에,골란의 이벤트 드리븐 패턴

ch <- data 
fmt.Println(someGlobalMap[data.key]) 

당신이 볼 수 있듯이 :

go func() { 
     for { 
      select { 
      case data := <-ch: 
       time.Sleep(1) 
       someGlobalMap[data.key] = data.value 
      } 
     } 
    }() 

그리고 주요 기능은 여러 goroutines을 만들 것이며, 그들 각각은 다음과 같이 일을 할 것입니다 : 그것은이처럼 작업을 수행 할 시간, 내 주요 기능에 아무런 결과가 없습니다.이 워크 플로우를 어떻게 적절하게 제어 할 수 있습니까?

+0

지도 읽기는 쓰기 goroutines와 동기화되어야합니다. select 절에서'ch <- data'를 다른 경우로 추가하십시오. – Nadh

답변

5

편집 : 나는 당신이 주요 많은 생산 goroutines을 시작할 것이라고 언급 참조 질문을 오해했을 수 있습니다. 나는 그것이 많은 소비자 goroutines이고, 하나의 생산자라고 생각했다. 여기에 답을 남겨두면 다른 사람들이 그 패턴을 찾는 데 유용 할 수 있습니다. 단, 글 머리 기호는 여전히 여러분의 경우에 적용됩니다.

올바른 사용 사례를 이해하면 채널을 보내고 즉시 결과를 읽을 수는 없습니다. 작업자가 언제 전송을 처리 할 것인지, goroutines간에 통신해야하는지, 채널을 통해 이루어 지는지 여부를 알 수 없습니다. 반환 값이있는 함수를 호출하는 것이 시나리오에서는 작동하지 않는다고 가정하면 실제로 작업자에게 보낼 필요가있는 경우 결과가 나올 때까지 차단하고 데이터 구조의 일부로 채널을 보내고, 즉, 송신 이후에 나타납니다

resCh := make(chan Result) 
ch <- Data{key, value, resCh} 
res := <- resCh 

하지만 당신은 아마, 대신에 독립적 인 단계의 파이프 라인으로 작업을 분해 내가 원래의 대답에 링크 된 블로그 게시물을 참조하려고한다.


나는 그것이 하나의 프로듀서라고 생각 원래 대답 - 여러 소비자/노동자 패턴 :

이 이동의 goroutines 및 채널 의미가 매우 적합있는 일반적인 패턴이다. 염두에 두어야 할 몇 가지 사항이 있습니다.

  • 주 기능은 goroutines가 완료 될 때까지 자동으로 대기하지 않습니다. 메인에 할 일이 없다면 프로그램이 종료되고 결과가 없습니다.

  • 사용하는 글로벌 맵은 스레드로부터 안전하지 않습니다. 뮤텍스를 통해 액세스를 동기화해야하지만 더 나은 방법이 있습니다. 이미 동기화 된 결과에 출력 채널을 사용하십시오.

  • 채널을 통해 for..range를 사용할 수 있으며 여러 goroutines간에 안전하게 채널을 공유 할 수 있습니다. 우리가 보게 되겠지만,이 패턴은 쓰기가 아주 우아합니다.

놀이터 : 오류 처리, 초기 취소 등을 소개하는 이동 파이프 라인 및 동시성 패턴에 대한 자세한 내용은 https://play.golang.org/p/WqyZfwldqp

: 유스 케이스에 대한 https://blog.golang.org/pipelines

댓글 코드는 언급 : 작업의 양을 미리 알 수없는 실제 시나리오에서

// could be a command-line flag, a config, etc. 
const numGoros = 10 

// Data is a similar data structure to the one mentioned in the question. 
type Data struct { 
    key string 
    value int 
} 

func main() { 
    var wg sync.WaitGroup 

    // create the input channel that sends work to the goroutines 
    inch := make(chan Data) 
    // create the output channel that sends results back to the main function 
    outch := make(chan Data) 

    // the WaitGroup keeps track of pending goroutines, you can add numGoros 
    // right away if you know how many will be started, otherwise do .Add(1) 
    // each time before starting a worker goroutine. 
    wg.Add(numGoros) 
    for i := 0; i < numGoros; i++ { 
     // because it uses a closure, it could've used inch and outch automaticaly, 
     // but if the func gets bigger you may want to extract it to a named function, 
     // and I wanted to show the directed channel types: within that function, you 
     // can only receive from inch, and only send (and close) to outch. 
     // 
     // It also receives the index i, just for fun so it can set the goroutines' 
     // index as key in the results, to show that it was processed by different 
     // goroutines. Also, big gotcha: do not capture a for-loop iteration variable 
     // in a closure, pass it as argument, otherwise it very likely won't do what 
     // you expect. 
     go func(i int, inch <-chan Data, outch chan<- Data) { 
      // make sure WaitGroup.Done is called on exit, so Wait unblocks 
      // eventually. 
      defer wg.Done() 

      // range over a channel gets the next value to process, safe to share 
      // concurrently between all goroutines. It exits the for loop once 
      // the channel is closed and drained, so wg.Done will be called once 
      // ch is closed. 
      for data := range inch { 
       // process the data... 
       time.Sleep(10 * time.Millisecond) 
       outch <- Data{strconv.Itoa(i), data.value} 
      } 
     }(i, inch, outch) 
    } 

    // start the goroutine that prints the results, use a separate WaitGroup to track 
    // it (could also have used a "done" channel but the for-loop would be more complex, with a select). 
    var wgResults sync.WaitGroup 
    wgResults.Add(1) 
    go func(ch <-chan Data) { 
     defer wgResults.Done() 

     // to prove it processed everything, keep a counter and print it on exit 
     var n int 
     for data := range ch { 
      fmt.Println(data.key, data.value) 
      n++ 
     } 

     // for fun, try commenting out the wgResults.Wait() call at the end, the output 
     // will likely miss this line. 
     fmt.Println(">>> Processed: ", n) 
    }(outch) 

    // send work, wherever that comes from... 
    for i := 0; i < 1000; i++ { 
     inch <- Data{"main", i} 
    } 

    // when there's no more work to send, close the inch, so the goroutines will begin 
    // draining it and exit once all values have been processed. 
    close(inch) 

    // wait for all goroutines to exit 
    wg.Wait() 

    // at this point, no more results will be written to outch, close it to signal 
    // to the results goroutine that it can terminate. 
    close(outch) 

    // and wait for the results goroutine to actually exit, otherwise the program would 
    // possibly terminate without printing the last few values. 
    wgResults.Wait() 
} 

,에 채널 올 수의 폐쇄를 예를 들어 SIGINT 신호. 채널이 닫힌 후에 코드 경로가 작동을 보낼 수 있는지 확인하십시오.

+0

이동 중에도 시뮬레이션을 수행 했습니까? 코드를 공유하거나 사용했던 추가 golang 기능이나 라이브러리를 말할 수 있습니까? –