동시에 여러 Luigi 작업 흐름을 실행할 때, 작업자 수가 합산됩니다. 즉, 두 개의 워크 플로를 함께 실행하고 작업자 수가 n으로 설정된 경우 luigi.cfg 파일에서 워크 플로가 동시에 n 명 이상의 작업자를 사용하는 경우 중앙 스케줄러는 2xn 근로자.여러 스크립트가 동시에 실행될 때 Luigi 작업자 수 제한
Luigi의 설명서에서 동시에 12 개의 워크 플로를 실행하더라도 직원 수를 n으로 제한 할 수있는 방법을 찾지 못했습니다.
이
이 내가 사용하고있는 예제 스크립트는[core]
workers: 3
내 luigi.cfg 파일입니다 (이것은 sciluigi (루이지의 상단에 레이어의 사실 사용한다)하지만 난 그게하게 생각하지 않는다 태스크 및 스케줄러 구성에 관한 차이). 나는 여러 번 함께 실행할 때 3 개의 마지막 워크 플로가 시작되기 전에 세 개의 첫 번째 워크 플로가 완료되기를 기다린다.
import optparse
import luigi
import sciluigi
import random
import time
import sys
import os
import subprocess
class MyFooWriter(sciluigi.Task):
# We have no inputs here
# Define outputs:
outdir = sciluigi.Parameter();
def out_foo(self):
return sciluigi.TargetInfo(self, os.path.join(self.outdir,'foo.txt'))
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
class MyFooReplacer(sciluigi.Task):
replacement = sciluigi.Parameter() # Here, we take as a parameter
# what to replace foo with.
outFile = sciluigi.Parameter();
outdir = sciluigi.Parameter();
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
return sciluigi.TargetInfo(self, os.path.join(self.outdir, self.outFile))
def run(self):
replacement = ""
with open(self.in_foo().path, 'r') as content_file:
content = content_file.read()
replacement = content.replace('foo', self.replacement)
for i in range(1,30):
sys.stderr.write(str(i)+"\n")
time.sleep(1)
with open(self.out_replaced().path,'w') as out_f:
out_f.write(replacement)
class MyWorkflow(sciluigi.WorkflowTask):
outdir = luigi.Parameter()
def workflow(self):
#rdint = random.randint(1,1000)
rdint = 100
barfile = "foobar_" + str(rdint) +'.bar.txt'
foowriter = self.new_task('foowriter', MyFooWriter, outdir = self.outdir)
fooreplacer = self.new_task('fooreplacer', MyFooReplacer, replacement='bar', outFile = barfile, outdir = self.outdir)
fooreplacer.in_foo = foowriter.out_foo
return fooreplacer
# End of script ....
if __name__ == '__main__':
parser = optparse.OptionParser()
parser.add_option('-d', dest = "outdir", action="store", default=".")
options, remainder = parser.parse_args()
params = {"outdir" : options.outdir}
wf = [MyWorkflow(outdir = options.outdir)]
luigi.build(wf)
이것은 내가 동시에 스크립트를 실행 (펄, 내가 제일 좋아하는 언어 :-))하는 데 사용하는 작은 펄 스크립트입니다.
#! /usr/bin/perl
use strict;
for (my $i = 0; $i < 6; $i++) {
my $testdir = "test".$i;
system("mkdir -p $testdir");
system("python run_sciluigi.py -d $testdir&");
sleep (2)
}
안녕 매트, 많은 감사합니다. 이것이 내가하는 일입니다. 이 질문은 주로 근로자 개념에 대한 오해에서 왔습니다. 나는 직원이 파이프 라인이 함께 작동 할 수있는 작업의 수를 알았지 만 자원은 노동자의 수와 상관없이 사용할 수있는 힘이다. –