저의 목표는 공통 포맷을 공유하는 하나 또는 여러 개의 csv 파일을 읽고 csv 데이터의 파티션 컬럼을 기반으로 개별 파일에 작성하는 것입니다. 마지막 열이 파티션이고 데이터가 정렬되지 않고 주어진 파티션이 여러 파일에서 발견되도록 허용하십시오. 하나 개의 파일의 예 :이 방법은 지칠대로 지친 XY 문제 냄새Golang의 파티션 컬럼에서 동시에 여러 csv 파일을 작성하십시오.
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
경우에, 나는 조정 드리겠습니다. 내가 지금까지 시도했습니다 무엇
: 데이터 세트에
- 읽기 및 새 작업자 루틴을 분사, 각 라인 파티션이 볼 된 경우
- 을 반복 (이 포함됩니다 a file/csv 작가). 회선을
chan []string
으로 보냅니다. - 각 작업자는 파일 작성자이므로 해당 입력 채널에서 정확히 한 파티션의 라인 만 수신해야합니다.
주어진 줄에 표시된 파티션 값을 기반으로 올바른 작업자에게 줄을 보내는 방법을 알지 못해서 분명히 작동하지 않습니다.
나는 각 작업자 각 파티션 값에 대한 id string
을 제공하지만 각 직원에 대해 별도의 chan []string
를 생성하고 select
로 해당 채널로 송신 할 필요가있는 경우 그 근로자에 보낼 선택하는 방법을 인식하지 오전 한, 또는 구조체가 일종의 풀 및 라우팅 기능을 가진 각 작업자를 보유해야하는 경우.
TLDR; 조건부로 데이터를 특정 조건의 루틴이나 채널에 보내는 방법에 관해서는 분실했습니다. 고유 한 수는 임의적 일 수 있지만 24 개의 고유 한 파티션 값을 초과하지는 않는 몇 가지 범주적 인 string
값을 기반으로합니다.
나는 이런 질문에주의를 기울 였다는 것을주의해야 할 것입니다. 이것이 반대 투표를하기에 충분하지 않다고 생각한다면, 왜 그렇게 반복해서 피할 수 있는지에 대한 의견을 말하십시오. 위반.
미리 도움 주셔서 감사합니다.
발췌문 :
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
'map [string] (chan [] 문자열)'을 생성하십시오.파티션 키를 받으면 행을 해당 채널로 보낼 수 있습니다. – zerkms
w/@ zerkms 동의. 또는 작업을 단순하게 유지하면서 유연성을 높이려면 각 작업자가 ID를 보유하는'struct' 유형의 인스턴스, 행을 전송하는 채널, 중지/플러시/닫기시기 및 필요한 다른 것을 알려주는 종료 채널을 만드십시오. , 그리고 나서'map [string] worker'를 잡고 올바른 작업자에게 올바른 라인을 보내기 위해 그것을 사용하십시오. – Adrian
정말 고마워요! 이러한 제안 된 구조를 만들었지 만 확실하지는 않습니다. - 교착 상태를 방지하는 방법 -지도에 추가하는 방법 [string] 작업자가 계속 추적하도록 제안한대로 (이 workerPool이라고 함) - 여전히 방법 이미 만든 근로자 중에서 선택하십시오 현재 진행 상황 : https://play.golang.org/p/j56r_QvSJs – gpanda