2017-02-21 9 views
0

원시 데이터를 작은 파일로 분할하는 luigi 전처리 작업이 있습니다. 이 파일들은 실제 파이프 라인에 의해 처리됩니다.런타임에 루이지 종속성이 변경됨

매개 변수와 관련하여 매개 변수로 하나의 전처리 된 파일 ID가있는 각 파이프 라인을 필요로합니다. 그러나이 파일 ID는 사전 처리 단계에서만 생성되므로 런타임시에만 알 수 있습니다. , Experiment

  1. 첫 번째는, 어떻게 든 두 번째 문서

  2. 에 원시 데이터의 분할을 요구해야

    import luigi 
    import subprocess 
    import random 
    
    
    class GenPipelineFiles(luigi.Task): 
    
        input_file = luigi.Parameter() 
    
        def requires(self): 
         pass 
    
        def output(self): 
    
         for i in range(random.randint(0,10)): 
          yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) 
    
        def run(self): 
    
         for iout in self.output: 
          command = "touch {}".format(iout.fname) 
          subprocess.call(command, shell=True) 
    
    
    class RunPipelineOnSmallChunk(luigi.Task): 
        pass 
    
    
    class Experiment(luigi.WrapperTask): 
    
        input_file = luigi.Parameter(default="ex") 
    
        def requires(self): 
    
         file_ids = GenPipelineFiles(input_file=self.input_file) 
    
         for file_id in file_ids: 
          yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) 
    
    
    luigi.run() 
    

    래퍼 작업 : 내 아이디어를 설명하기 위해 나는이-작동하지 않는 코드를 제공 사전 처리의 획득 된 파일 ID로 실제 파이프 라인이 필요합니다. GenPipelineFiles 출력 파일

난수는이 하드 코딩 될 수없는 것을 나타내는 Experimentrequires.

이 질문과 관련있는 질문은 luigi 작업에는 입력 대상과 출력 대상이 하나만 적절하다는 것입니다. 아마도 GenPipelineFiles에서 여러 출력을 모델링하는 방법에 대한 참고 사항도 문제를 해결할 수 있습니다.

+0

당신이이 시점에서 점점 오류를 설명 할 수 sampletask.py? –

+0

luigi 의존성 그래프는'requires' 함수의 반환을 기반으로 생성됩니다. 여기에서 GePipelineFiles는 반환되지 않으므로 예약되지 않습니다. 이 코드는 실제 코드가 아니며 결코 오류없이 실행되지 않습니다. 그것은 내가 직면하는 의존성 문제의 설명 목적을위한 것일뿐입니다. –

답변

0

여러 출력을 처리하는 간단한 방법 중 하나는 입력 파일 다음에 이름 지정된 디렉터리를 만들고이 파일의 분할 된 출력 파일을 입력 파일의 이름이 지정된 디렉터리에 저장하는 것입니다. 그런 식으로 종속 작업은 디렉터리의 존재 여부 만 확인할 수 있습니다. 입력 파일 123.txt가 있다고 가정 해 봅시다. 그런 다음 파일 123.32.15, 2.txt, 3.txt를 GenPipelineFiles의 출력으로 만든 다음 123_processed with 1.txt, 2.txt, 3.txt는 RunPipelineOnSmallChunk의 출력입니다.

requires 메서드가 Experiment 인 경우 실행하려는 작업을 예를 들어 목록으로 반환해야합니다. file_ids = GenPipelineFiles(input_file=self.input_file)을 작성한 방법은 메서드에 의해 반환되지 않기 때문에 해당 개체의 run 메서드가 호출되지 않는다고 생각하게 만듭니다.

여기에 파일별로 대상과 함께 작동하는 샘플 코드가 있습니다 (파일 당 작업은 아닙니다). 나는 여전히 당신이 완료되었음을 나타 내기 위해 어떤 종류의 디렉토리 또는 센티넬 파일의 단일 출력 대상을 갖는 것이 더 안전하다고 생각합니다. 작업을 통해 각 대상이 생성되지 않는 한 원자 성은 손실됩니다.

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler 

import luigi 
import os 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    inputfile = luigi.Parameter() 
    num_targets = random.randint(0,10) 

    def requires(self): 
     pass 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "split_{}".format(self.get_prefix()) 

    def output(self): 
     targets = [] 
     for i in range(self.num_targets): 
      targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 

    inputfile = luigi.Parameter(default="test") 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "processed_{}".format(self.get_prefix()) 

    @staticmethod 
    def clean_input_path(path): 
     return path.replace("split", "processed") 

    def requires(self): 
     return GenPipelineFiles(self.inputfile) 

    def output(self): 
     targets = [] 
     for target in self.input(): 
      targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 
+0

답장을 보내 주셔서 감사합니다. 귀하의 제안에서 최적이 아니라고 생각하는 두 가지가 있습니다. 그러나 아마도 당신의 대답을 오해했을 것입니다. 1. RunPipelineOnSmallChunk가 전체 디렉토리의'RunPipelineOnSmallChunk' 태스크는 원자가가 아니라 필요한 계산을 수행하기 위해 모든 파일을 반복해야합니다. 2.'Experiment'에서 두 가지 유형의 태스크가 필요하다면 luigi 스케줄러는 불필요하게 하위 태스크를 여러 번 실행하려고 시도 할 수 있습니다. –

+0

2의 경우 실험에'RunPipelineOnSmallChunk'가 필요하고'RunPipelineOnSmallChunk'에는'GenPipelineFiles'가 필요합니다. – MattMcKnight

+0

코드가 수정 된 버전으로 다시 추가되었습니다. – MattMcKnight