1

저는 파이프 라인 작성시 ruffus를 사용하고 있습니다. 병렬 함수를 여러 번 호출하고 여러 파일을 생성하는 함수가 있습니다. 나는 모든 파일이 만들어진 후에 호출되는 "combineFiles()"함수를 만들고 싶습니다. 클러스터에서 병렬로 실행되기 때문에 모두 함께 끝나지는 않습니다. 내가 작성해야 할 파일 이름 집합을 반환하는 함수 'getFilenames()'를 작성했지만, 어떻게 그들을 결합 파일() 기다릴 수 있습니까?Ruffus 파이프 라인에서 기능을 실행하기 전에 일련의 파일을 작성해야합니다.

@pipelineFunction 
@files(getFilenames) 
def combineFiles(filenames): 
    # I should only be called if every file in the list 'filenames' exists 

나는 또한 장식 시도했다 : :

@merge(getFilenames) 

을하지만이 중 하나가 작동하지 않습니다

나는 다음 시도했다. combineFiles는 여전히 getFilenames에 의해 주어진 파일이 만들어지기 전에 오류가 발생한다. 어떻게하면 해당 파일에 combineFiles 조건부를 만들 수 있습니까?

감사합니다.

답변

2

나는 Ruffus 개발자입니다. 나는 당신이 무엇을하려고하는지 완전히 이해하지 못하고 있지만 여기에 간다 :

파이프 라인의 다음 단계를 실행하기 위해 끝내는 데 다른 시간이 걸리는 작업 대기는 Ruffus가하는 것과 정확히 같습니다. 이것은 잘하면 간단합니다.

첫 번째 질문은 파이프 라인이 실행되기 전에 어떤 파일이 만들어지고 있는지 아십니까? 당신이하는 것으로 가정함으로써 시작할 수 있습니다.

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

파일을 호출 할 때마다 더미 함수를 작성하겠습니다. Ruffus에서 입력 및 출력 파일 이름은 처음 두 매개 변수에 각각 들어 있습니다. 우리는 입력 된 파일 이름이 없다, 그래서 우리의 함수 호출은 다음과 같아야합니다

create_file(None, "one.file") 
create_file(None, "two.file") 
create_file(None, "three.file") 

create_file의 정의는 다음과 같을 것이다 :

이 파일은 각각 3 개 별도의 통화에서 생성 될 수
@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    open(output_file_name, "w").write("dummy file") 

create_file에. 원하는 경우 병렬로 실행할 수 있습니다.

pipeline_run([create_file], multiprocess = 5) 

이제 파일을 결합하십시오. "@Merge"데코레이터는 실제로이를 위해 정확하게 설정됩니다. 우리는 이전의 기능에 그것을 연결해야합니다 모든 파일이 create_file 할 세 가지 통화에서 준비가되면

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 

이 만 해당) (merge_file를 호출합니다. 다음과 같이

전체 코드는 다음과 같습니다

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

from random import randint 
from time import sleep 

@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    # simulate create file process of indeterminate complexity 
    sleep(randint(1,5)) 
    open(output_file_name, "w").write("dummy file") 

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 


pipeline_run([merge_file], multiprocess = 5) 

을 그리고 이것은 결과입니다

>>> pipeline_run([merge_file], multiprocess = 5) 

    Job = [None -> two.file] completed 
    Job = [None -> three.file] completed 
    Job = [None -> one.file] completed 
Completed Task = create_file 
    Job = [[one.file, three.file, two.file] -> merge.file] completed 
Completed Task = merge_file