2017-10-20 9 views
0

몇 단계를 구현하기 위해 https://blog.golang.org/pipelines 문서를 따르고 있습니다.골란에서 다음 단계를 위해 지연을 도입하는 파이프 라인 단계를 작성하는 방법은 무엇입니까?

이벤트가 파이프 라인의 다음 단계에서 전달되기까지 몇 초의 지연을 도입하는 단계가 필요합니다.

아래의 코드에 대한 우려는 이벤트를 전달하기 전에 time.Sleep() 함수를 무한대로 실행하는 것입니다. 이 작업을 수행하는 더 좋은 방법이 있습니까?

감사합니다.

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    out := make(chan *Bar, 10000) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 

답변

1

다른 채널을 사용하여 루프가 생성 할 수있는 활성 고 루틴의 수를 제한 할 수 있습니다.

const numRoutines = 10 

func fooStage(inChan <-chan *Bar) <-chan *Bar { 
    out := make(chan *Bar, 10000) 
    routines := make(chan struct{}, numRoutines) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       routines <- struct{}{} 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
        <-routines 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 
+0

감사합니다. 좋은 생각 인 것 같습니다. 내가 볼 수있는 유일한 단점은 '루틴'채널이 차단되면 이벤트가 5 초 이상 지연된다는 것입니다. 비록 내가 생각하는 이벤트 안에 타임 스탬프가 없어도 그것을 다루는 좋은 방법이 없습니다. – ultimoo

+1

@ultimoo 5 초 대기로 인해 수백 또는 수천 개의 goroutines를 쉽게 실행할 수있어 실제 이벤트 대기 시간이 줄어 듭니다. 이런 식으로 코드를 읽는 것만으로는 판단하기가 어렵습니다. 테스트 및 벤치마킹은 실제로 어떻게 작동 하는지를 실제로 결정하는 데 필요합니다. – RayfenWindspear

+0

물론 이것은 더 많은 실험입니다. 나의 직관은이 goroutine을 몇 천 개 돌리는 것이 OK라는 것입니다. 왜냐하면 그들이하는 일은'time.Sleep()'으로 실행되기 때문에 대부분의 수명 동안 프로세서에서 스케줄링되지 않기 때문입니다. – ultimoo

0

당신은 time.Ticker를 사용할 수 있습니다

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    //... some code 
    ticker := time.NewTicker(5 * time.Second) 
    <-ticker // the delay, probably need to call twice 
    ticker.Stop() 
    close(ticker.C) 
    //... rest code 
} 
+0

후속 이벤트에서 이것이 어떻게 작동하는지 설명 할 수 있습니까? inChan에서 두 가지 이벤트가 발생하면 두 번째 이벤트가 10 초 동안 기다리지 않습니까? – ultimoo

+0

'-ticker는 지정된 기간보다 자주 리턴되어야합니다. 그냥 시도 해 봐 –

1

수동 goroutines의 수를 해결할 수 있습니다 - 시작 단지 당신이 필요로하는 수.

func sleepStage(in <-chan *Bar) (out <-chan *Bar) { 
    out = make(<-chan *Bar) 
    wg := sync.WaitGroup 
    for i:=0; i < N; i++ { // Number of goroutines in parallel 
      wg.Add(1) 
      go func(){ 
       defer wg.Done() 
       for e := range in { 
        time.Sleep(5*time.Seconds) 
        out <- e 
       } 
      }() 
     } 
     go func(){} 
      wg.Wait() 
      close(out) 
     }() 
     return out 
    }