나는 Ruffus 개발자입니다. 나는 당신이 무엇을하려고하는지 완전히 이해하지 못하고 있지만 여기에 간다 :
파이프 라인의 다음 단계를 실행하기 위해 끝내는 데 다른 시간이 걸리는 작업 대기는 Ruffus가하는 것과 정확히 같습니다. 이것은 잘하면 간단합니다.
첫 번째 질문은 파이프 라인이 실행되기 전에 어떤 파일이 만들어지고 있는지 아십니까? 당신이하는 것으로 가정함으로써 시작할 수 있습니다.
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
파일을 호출 할 때마다 더미 함수를 작성하겠습니다. Ruffus에서 입력 및 출력 파일 이름은 처음 두 매개 변수에 각각 들어 있습니다. 우리는 입력 된 파일 이름이 없다, 그래서 우리의 함수 호출은 다음과 같아야합니다
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
create_file의 정의는 다음과 같을 것이다 :
이 파일은 각각 3 개 별도의 통화에서 생성 될 수
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
create_file에. 원하는 경우 병렬로 실행할 수 있습니다.
pipeline_run([create_file], multiprocess = 5)
이제 파일을 결합하십시오. "@Merge"데코레이터는 실제로이를 위해 정확하게 설정됩니다. 우리는 이전의 기능에 그것을 연결해야합니다 모든 파일이 create_file 할 세 가지 통화에서 준비가되면
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
이 만 해당) (merge_file를 호출합니다. 다음과 같이
전체 코드는 다음과 같습니다
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
을 그리고 이것은 결과입니다
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file