2017-11-14 7 views
1

저의 목표는 공통 포맷을 공유하는 하나 또는 여러 개의 csv 파일을 읽고 csv 데이터의 파티션 컬럼을 기반으로 개별 파일에 작성하는 것입니다. 마지막 열이 파티션이고 데이터가 정렬되지 않고 주어진 파티션이 여러 파일에서 발견되도록 허용하십시오. 하나 개의 파일의 예 :이 방법은 지칠대로 지친 XY 문제 냄새Golang의 파티션 컬럼에서 동시에 여러 csv 파일을 작성하십시오.

fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,04 
22df9,abc,def,2017,11,06,03 
1d243,abc,def,2017,11,06,02 

경우에, 나는 조정 드리겠습니다. 내가 지금까지 시도했습니다 무엇

: 데이터 세트에

  • 읽기 및 새 작업자 루틴을 분사, 각 라인 파티션이 볼 된 경우
  • 을 반복 (이 포함됩니다 a file/csv 작가). 회선을 chan []string으로 보냅니다.
  • 각 작업자는 파일 작성자이므로 해당 입력 채널에서 정확히 한 파티션의 라인 만 수신해야합니다.

주어진 줄에 표시된 파티션 값을 기반으로 올바른 작업자에게 줄을 보내는 방법을 알지 못해서 분명히 작동하지 않습니다.

나는 각 작업자 각 파티션 값에 대한 id string을 제공하지만 각 직원에 대해 별도의 chan []string를 생성하고 select로 해당 채널로 송신 할 필요가있는 경우 그 근로자에 ​​보낼 선택하는 방법을 인식하지 오전 한, 또는 구조체가 일종의 풀 및 라우팅 기능을 가진 각 작업자를 보유해야하는 경우.

TLDR; 조건부로 데이터를 특정 조건의 루틴이나 채널에 보내는 방법에 관해서는 분실했습니다. 고유 한 수는 임의적 일 수 있지만 24 개의 고유 한 파티션 값을 초과하지는 않는 몇 가지 범주적 인 string 값을 기반으로합니다.

나는 이런 질문에주의를 기울 였다는 것을주의해야 할 것입니다. 이것이 반대 투표를하기에 충분하지 않다고 생각한다면, 왜 그렇게 반복해서 피할 수 있는지에 대한 의견을 말하십시오. 위반.

미리 도움 주셔서 감사합니다.

Playground

발췌문 :

package main 

    import (
     "encoding/csv" 
     "fmt" 
     "log" 
     "strings" 
     "time" 
    ) 

    func main() { 

     // CSV 
     r := csv.NewReader(csvFile1) 
     lines, err := r.ReadAll() 
     if err != nil { 
      log.Fatalf("error reading all lines: %v", err) 
     } 

     // CHANNELS 
     lineChan := make(chan []string) 

     // TRACKER 
     var seenPartitions []string 

     for _, line := range lines { 

      hour := line[6] 
      if !stringInSlice(hour, seenPartitions) { 
       seenPartitions = append(seenPartitions, hour) 
       go worker(hour, lineChan) 
      } 
      // How to send to the correct worker/channel? 
      lineChan <- line 

     } 
     close(lineChan) 
    } 

    func worker(id string, lineChan <-chan []string) { 
     for j := range lineChan { 
      fmt.Println("worker", id, "started job", j) 
      // Write to a new file here and wait for input over the channel 
      time.Sleep(time.Second) 
      fmt.Println("worker", id, "finished job", j) 
     } 
    } 

    func stringInSlice(str string, list []string) bool { 
     for _, v := range list { 
      if v == str { 
       return true 
      } 
     } 
     return false 
    } 

    // DUMMY 
var csvFile1 = strings.NewReader(` 
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01 
11213,abc,def,2017,11,06,02 
1sdf9,abc,def,2017,11,06,01 
2123r,abc,def,2017,11,06,03 
1v2t3,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1d243,abc,def,2017,11,06,01 
1da23,abc,def,2017,11,06,04 
a1523,abc,def,2017,11,06,01 
12453,abc,def,2017,11,06,04`) 
+1

'map [string] (chan [] 문자열)'을 생성하십시오.파티션 키를 받으면 행을 해당 채널로 보낼 수 있습니다. – zerkms

+1

w/@ zerkms 동의. 또는 작업을 단순하게 유지하면서 유연성을 높이려면 각 작업자가 ID를 보유하는'struct' 유형의 인스턴스, 행을 전송하는 채널, 중지/플러시/닫기시기 및 필요한 다른 것을 알려주는 종료 채널을 만드십시오. , 그리고 나서'map [string] worker'를 잡고 올바른 작업자에게 올바른 라인을 보내기 위해 그것을 사용하십시오. – Adrian

+0

정말 고마워요! 이러한 제안 된 구조를 만들었지 만 확실하지는 않습니다. - 교착 상태를 방지하는 방법 -지도에 추가하는 방법 [string] 작업자가 계속 추적하도록 제안한대로 (이 workerPool이라고 함) - 여전히 방법 이미 만든 근로자 중에서 선택하십시오 현재 진행 상황 : https://play.golang.org/p/j56r_QvSJs – gpanda

답변

1

동기 버전이 더 먼저 동시 마법을 이동 (아래의 동시 버전을 참조).

package main 

import (
    "encoding/csv" 
    "fmt" 
    "io" 
    "log" 
    "strings" 
) 

func main() { 

    // CSV 
    r := csv.NewReader(csvFile1) 
    partitions := make(map[string][][]string) 

    for { 
     rec, err := r.Read() 
     if err != nil { 
      if err == io.EOF { 
       err = nil 

       save_partitions(partitions) 

       return 
      } 
      log.Fatal(err) 
     } 

     process(rec, partitions) 
    } 

} 

// prints only 
func save_partitions(partitions map[string][][]string) { 
    for part, recs := range partitions { 
     fmt.Println(part) 
     for _, rec := range recs { 
      fmt.Println(rec) 
     } 
    } 
} 

// this can also write/append directly to a file 
func process(rec []string, partitions map[string][][]string) { 
    l := len(rec) 
    part := rec[l-1] 
    if p, ok := partitions[part]; ok { 
     partitions[part] = append(p, rec) 
    } else { 
     partitions[part] = [][]string{rec} 
    } 
} 

// DUMMY 
var csvFile1 = strings.NewReader(` 
fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,01 
1d243,abc,def,2017,11,06,01 
1v2t3,abc,def,2017,11,06,01 
a1523,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
11213,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
2123r,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1da23,abc,def,2017,11,06,04 
12fy3,abc,def,2017,11,06,04 
12453,abc,def,2017,11,06,04`) 

https://play.golang.org/p/--iqZGzxCF

그리고 동시 버전 :

package main 

import (
    "encoding/csv" 
    "fmt" 
    "io" 
    "log" 
    "strings" 
    "sync" 
) 

var (
    // list of channels to communicate with workers 
    // workers accessed synchronousely no mutex required 
    workers = make(map[string]chan []string) 

    // wg is to make sure all workers done before exiting main 
    wg = sync.WaitGroup{} 

    // mu used only for sequential printing, not relevant for program logic 
    mu = sync.Mutex{} 
) 

func main() { 

    // wait for all workers to finish up before exit 
    defer wg.Wait() 

    r := csv.NewReader(csvFile1) 

    for { 
     rec, err := r.Read() 
     if err != nil { 
      if err == io.EOF { 
       savePartitions() 
       return 
      } 
      log.Fatal(err) // sorry for the panic 
     } 
     process(rec) 
    } 

} 

func process(rec []string) { 
    l := len(rec) 
    part := rec[l-1] 

    if c, ok := workers[part]; ok { 
     // send rec to worker 
     c <- rec 
    } else { 
     // if no worker for the partition 

     // make a chan 
     nc := make(chan []string) 
     workers[part] = nc 

     // start worker with this chan 
     go worker(nc) 

     // send rec to worker via chan 
     nc <- rec 
    } 
} 

func worker(c chan []string) { 

    // wg.Done signals to main worker completion 
    wg.Add(1) 
    defer wg.Done() 

    part := [][]string{} 
    for { 
     // wait for a rec or close(chan) 
     rec, ok := <-c 
     if ok { 
      // save the rec 
      // instead of accumulation in memory 
      // this can be saved to file directly 
      part = append(part, rec) 
     } else { 
      // channel closed on EOF 

      // dump partition 
      // locks ensures sequential printing 
      // not a required for independent files 
      mu.Lock() 
      for _, p := range part { 
       fmt.Printf("%+v\n", p) 
      } 
      mu.Unlock() 

      return 
     } 
    } 
} 

// simply signals to workers to stop 
func savePartitions() { 
    for _, c := range workers { 
     // signal to all workers to exit 
     close(c) 
    } 
} 

// DUMMY 
var csvFile1 = strings.NewReader(` 
fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,01 
1d243,abc,def,2017,11,06,01 
1v2t3,abc,def,2017,11,06,01 
a1523,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
11213,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
2123r,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1da23,abc,def,2017,11,06,04 
12fy3,abc,def,2017,11,06,04 
12453,abc,def,2017,11,06,04`) 

https://play.golang.org/p/oBTPosy0yT

재밌게!

+0

내 유스 케이스에서 어떻게 작동하는지 다시 한 번 살펴보고 질문이 있으면 다시보고합니다. 감사! – gpanda

+0

두 가지 솔루션 모두 훌륭하지만 두 번째 솔루션은 특히 뛰어납니다. 나는 특히 worker,()에서 chan을 닫는 것에 대한 ok 문법과 설명문을 감사하게 생각한다. 독립적 인 파일에 Lock()이 필요하지 않은 방법에 대한 의견이 궁금합니다. 나는 이것이 각 파티션을위한 별도의 csv에 쓰기 위해 적응할 곳이라고 생각한다 - 각 파티션을 파일에 쓰기 위해 여기서는 잠금이 필요하지 않을까? – gpanda

+1

@gpanda 네, Locking 할 필요가 없다고 확신합니다. 인쇄 할 때 인쇄 할 단일 공유 자원을 표준 출력으로 가지고 있기 때문에 필요합니다. 'mu.Lock and mu.Unlock'을 주석 처리하십시오. 선의 순서가 무작위가 아니라는 점을 제외하면 나쁜 것은 없습니다. 이 자물쇠로 각 노동자는 "말하는 동안 모든 사람을 쳐다 보며"라고 말하고 있습니다. 작업자 1 명당 하나의 파일을 가지고있을 때는 아무것도 공유하지 않으므로 part : = [] [] string {}'을 파일로 생각할 수 있습니다. 그리고 우리는 그것을 추가하기 위해 그것을 잠글 수 없습니다. – biosckon