2017-05-08 3 views
6

에서 일률적 인 작업 배포로 인해 일찍 죽어가는 노동자 docker swarm 클러스터에 배포 된 간단한 파이프 라인을 실행하려고합니다. 루이지 노동자는 복제 된 도커 서비스로 배치됩니다. 그들은 성공적으로 시작하고 luigi-server에 대한 작업을 요청한 후 몇 초가 지나면 작업이 할당되지 않아 죽기 시작하고 모든 작업은 단일 작업자에게 할당됩니다.Luigi (2.6.1)

근로자의 luigi.cfg에서 keep_alive = True로 설정하여 강제로 죽지 않도록해야하지만 파이프 라인이 완료된 후에 근로자를 지키려는 것이 좋습니다. 작업 배포를 제어하는 ​​방법이 있습니까?

우리의 테스트 파이프 라인 :

class RunAllTasks(luigi.Task): 

    tasks = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    def requires(self): 
     for i in range(self.tasks): 
      yield RunExampleTask(i, self.sleep_time) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write('All done!') 

    def output(self): 
     return LocalTarget('/data/RunAllTasks.txt') 


class RunExampleTask(luigi.Task): 

    number = luigi.IntParameter() 
    sleep_time = luigi.IntParameter() 

    @property 
    def cmd(self): 
     return """ 
       docker run --rm --name example_{number} hello-world 
      """.format(number=self.number) 

    def run(self): 
     time.sleep(self.sleep_time) 
     logger.debug(self.cmd) 
     out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True) 
     logger.debug(out) 
     with self.output().open('w') as f: 
      f.write(str(out)) 

    def output(self): 
     return LocalTarget('/data/{number}.txt'.format(number=self.number)) 


if __name__ == "__main__": 
    luigi.run() 
+0

항의 자 질문이 아닌 것 같습니까? – johnharris85

+0

각 노드에서'RunAllTasks'를 시작 하시겠습니까? – MattMcKnight

+0

@MattMcKnight 예, 파이프 라인을 도커 서비스로 캡슐화했기 때문에 웜은 각 노드에서 복제본을 시작합니다 (라운드 로빈). – fcisneros

답변

1

문제는 한 번에 하나의 요구 사항을 보내고 yield의 결과이며, 다음과 같이 대신에 당신이 한 번에 yield에 그들 모두를 원하는 :

def requires(self): 
    reqs = [] 
    for i in range(self.tasks): 
     reqs.append(RunExampleTask(i, self.sleep_time)) 
    yield reqs