원시 데이터를 작은 파일로 분할하는 luigi
전처리 작업이 있습니다. 이 파일들은 실제 파이프 라인에 의해 처리됩니다.런타임에 루이지 종속성이 변경됨
매개 변수와 관련하여 매개 변수로 하나의 전처리 된 파일 ID가있는 각 파이프 라인을 필요로합니다. 그러나이 파일 ID는 사전 처리 단계에서만 생성되므로 런타임시에만 알 수 있습니다. , Experiment
이
첫 번째는, 어떻게 든 두 번째 문서
에 원시 데이터의 분할을 요구해야
import luigi import subprocess import random class GenPipelineFiles(luigi.Task): input_file = luigi.Parameter() def requires(self): pass def output(self): for i in range(random.randint(0,10)): yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) def run(self): for iout in self.output: command = "touch {}".format(iout.fname) subprocess.call(command, shell=True) class RunPipelineOnSmallChunk(luigi.Task): pass class Experiment(luigi.WrapperTask): input_file = luigi.Parameter(default="ex") def requires(self): file_ids = GenPipelineFiles(input_file=self.input_file) for file_id in file_ids: yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) luigi.run()
래퍼 작업 : 내 아이디어를 설명하기 위해 나는이-작동하지 않는 코드를 제공 사전 처리의 획득 된 파일 ID로 실제 파이프 라인이 필요합니다.
GenPipelineFiles
출력 파일
난수는이 하드 코딩 될 수없는 것을 나타내는 Experiment
의 requires
.
이 질문과 관련있는 질문은 luigi
작업에는 입력 대상과 출력 대상이 하나만 적절하다는 것입니다. 아마도 GenPipelineFiles
에서 여러 출력을 모델링하는 방법에 대한 참고 사항도 문제를 해결할 수 있습니다.
당신이이 시점에서 점점 오류를 설명 할 수 sampletask.py? –
luigi 의존성 그래프는'requires' 함수의 반환을 기반으로 생성됩니다. 여기에서 GePipelineFiles는 반환되지 않으므로 예약되지 않습니다. 이 코드는 실제 코드가 아니며 결코 오류없이 실행되지 않습니다. 그것은 내가 직면하는 의존성 문제의 설명 목적을위한 것일뿐입니다. –