2017-11-14 14 views
1

동시에 여러 Luigi 작업 흐름을 실행할 때, 작업자 수가 합산됩니다. 즉, 두 개의 워크 플로를 함께 실행하고 작업자 수가 n으로 설정된 경우 luigi.cfg 파일에서 워크 플로가 동시에 n 명 이상의 작업자를 사용하는 경우 중앙 스케줄러는 2xn 근로자.여러 스크립트가 동시에 실행될 때 Luigi 작업자 수 제한

Luigi의 설명서에서 동시에 12 개의 워크 플로를 실행하더라도 직원 수를 n으로 제한 할 수있는 방법을 찾지 못했습니다.

이 내가 사용하고있는 예제 스크립트는

[core] 
workers: 3 

내 luigi.cfg 파일입니다 (이것은 sciluigi (루이지의 상단에 레이어의 사실 사용한다)하지만 난 그게하게 생각하지 않는다 태스크 및 스케줄러 구성에 관한 차이). 나는 여러 번 함께 실행할 때 3 개의 마지막 워크 플로가 시작되기 전에 세 개의 첫 번째 워크 플로가 완료되기를 기다린다.

import optparse 
import luigi 
import sciluigi 
import random 
import time 
import sys 
import os 
import subprocess 


class MyFooWriter(sciluigi.Task): 
    # We have no inputs here 
    # Define outputs: 
    outdir = sciluigi.Parameter(); 
    def out_foo(self): 
     return sciluigi.TargetInfo(self, os.path.join(self.outdir,'foo.txt')) 
    def run(self): 
     with self.out_foo().open('w') as foofile: 
      foofile.write('foo\n') 

class MyFooReplacer(sciluigi.Task): 
    replacement = sciluigi.Parameter() # Here, we take as a parameter 
            # what to replace foo with. 
    outFile = sciluigi.Parameter(); 
    outdir = sciluigi.Parameter(); 
    # Here we have one input, a "foo file": 
    in_foo = None 

    # ... and an output, a "bar file": 
    def out_replaced(self): 
     return sciluigi.TargetInfo(self, os.path.join(self.outdir, self.outFile)) 

    def run(self): 
     replacement = "" 
     with open(self.in_foo().path, 'r') as content_file: 
      content = content_file.read() 
      replacement = content.replace('foo', self.replacement) 
      for i in range(1,30): 
       sys.stderr.write(str(i)+"\n") 
       time.sleep(1) 
     with open(self.out_replaced().path,'w') as out_f: 
      out_f.write(replacement) 



class MyWorkflow(sciluigi.WorkflowTask): 
    outdir = luigi.Parameter() 
    def workflow(self): 
     #rdint = random.randint(1,1000) 
     rdint = 100 
     barfile = "foobar_" + str(rdint) +'.bar.txt' 
     foowriter = self.new_task('foowriter', MyFooWriter, outdir = self.outdir) 
     fooreplacer = self.new_task('fooreplacer', MyFooReplacer, replacement='bar', outFile = barfile, outdir = self.outdir) 
     fooreplacer.in_foo = foowriter.out_foo 
     return fooreplacer 


# End of script .... 
if __name__ == '__main__': 
    parser = optparse.OptionParser() 
    parser.add_option('-d', dest = "outdir", action="store", default=".") 
    options, remainder = parser.parse_args() 
    params = {"outdir" : options.outdir}  
    wf = [MyWorkflow(outdir = options.outdir)] 
    luigi.build(wf) 

이것은 내가 동시에 스크립트를 실행 (펄, 내가 제일 좋아하는 언어 :-))하는 데 사용하는 작은 펄 스크립트입니다.

#! /usr/bin/perl 

use strict; 

for (my $i = 0; $i < 6; $i++) { 
    my $testdir = "test".$i; 
    system("mkdir -p $testdir"); 
    system("python run_sciluigi.py -d $testdir&"); 
    sleep (2) 
} 

답변

0

정확히 노동자 제한, 그것은 동시 실행에 글로벌 제한을 넣어 resources 개념을 사용하는 것이 가능하지만. 당신의 모든 작업에서 luigi.cfg

[resources] 
max_workers=5 

에서

: 귀하의 답변에 대한

class MyFooReplacer(sciluigi.Task): 
    resources = {'max_workers': 1} 

http://luigi.readthedocs.io/en/stable/configuration.html#resources

+0

안녕 매트, 많은 감사합니다. 이것이 내가하는 일입니다. 이 질문은 주로 근로자 개념에 대한 오해에서 왔습니다. 나는 직원이 파이프 라인이 함께 작동 할 수있는 작업의 수를 알았지 만 자원은 노동자의 수와 상관없이 사용할 수있는 힘이다. –