2017-10-02 11 views
0

공기 흐름 구현시 다중 작업자 데이터가 있습니다. 는 DAG-A 운영자 T1, T2를 가지고 말을 순차적으로 실행하도록 설정되어 T3을 수 있습니다 (예. T2는 T1에 의존하고, T3는 T2에 따라 달라집니다.)실행중인 인스턴스의 모든 작업이 완료 될 때까지 다른 인스턴스가 인스턴스화되지 않도록 다중 운영자 dag를 설정하는 방법은 무엇입니까?

task_2.set_upstream(task_1) 
task_3.set_upstream(task_2) 

것은 우리는 때 보험에 가입 할 필요가 DAG-A가 성공적으로 같은 DAG의 다른 인스턴스 전에 완료 모든 작업을 인스턴스화, 인스턴스화

우리는 우리의 DAG를에 다음과 같은 설정

(또는 다음 DAG 인스턴스에서 첫 번째 작업이 트리거되기 전에.) :

da['depends_on_past'] = True 

지금 무슨 일이 일어나고 있는지 그쪽은 인스턴스화 된 dag에 오류가 없으면 원하는 효과가 나타납니다.
그러나 dag-a가 매시간 실행되도록 예약되어 있다고 가정 해 보겠습니다. 시간에 dag-a-i1 인스턴스는 예정대로 트리거됩니다. 그런 다음 dag-a-i1 작업 t1이 성공적으로 실행되고 t2가 실행을 시작하고 실패합니다. 이 시나리오에서는 예상대로 dag-a-i1 인스턴스가 중지됩니다. 다음 시간이 오면 dag-a-i2 인스턴스가 트리거되고 dag 인스턴스 (i2)에 대한 작업 t1이 실행을 시작하고 완료를 말한 다음 dag-a-i2가 중지됩니다. t2 (dag-a-i1의 경우)의 이전 인스 턴스가 실패한 상태이기 때문에 실행됩니다.

우리가 관찰해야 할 행동은 두 번째 인스턴스가 트리거되지 않았거나 트리거되면 트리거 된 두 번째 인스턴스에 대한 작업 t1을보고 싶지 않다는 것입니다. 이것은 우리에게 문제를 일으키는 것입니다.

도움을 주시면 감사하겠습니다.

답변

0

답변을 시작하기 전에 질문에서 제시 한 이름 지정 규칙과 다른 이름 지정 규칙을 설정합니다.

DagA.TimeA.T1A 일 때 T1 태스크를 실행하는 DAG A의 인스턴스를 참조합니다.

계속 진행하면 두 가지 해결책이 있습니다.

첫 번째 :

특히 예쁜하지 않지만, 당신은 당신의 DAG의 시작 부분에 센서 작업을 추가 할 수 있습니다. 이 센서는 동일한 DAG의 최종 작업 실행을 기다려야합니다. 다음과 같은 뭔가 작업을해야합니다 :

from airflow import DAG 
from airflow.operators.dummy_operator import DummyOperator 
from airflow.operators.sensors import ExternalTaskSensor 
from datetime import timedelta 

dag = DAG(dag_id="ETL", schedule_interval="@hourly") 
ensure_prior_success = ExternalTaskSensor(external_dag_id="ETL", 
external_task_id="final_task", execution_delta=timedelta(hours=1)) 
final_task = DummyOperator(task_id="final_task", dag=dag) 

은 비 센서 작업 중 하나가 DagA.TimeA 실행 중에 실패하는 경우, DagA.TimeB는 센서 작업을 실행을 시작 하겠지만 결국 시간 제한됩니다,이 방법을 쓰는.

이렇게하면 DAG를 작성하는 경우주의해야 할 몇 가지 사항이 있습니다.

  1. (당신은 당신이 이제까지 생각되면, 나)이 DAG의 채워 넣을 수행하는 방법에 대한 계획하는 경우에, 당신은 저 번호로 DAG의 max_active_runs을 설정해야합니다. 그 이유는 충분히 큰 백필이 글로벌 작업 큐를 센서 작업으로 채우고 새 작업을 대기시킬 수없는 상황을 만들 수 있기 때문입니다.

  2. 이 DAG의 첫 번째 실행에는 사람의 개입이 필요합니다. 사람은 초기 센서 작업을 성공으로 표시해야합니다 (이전 실행이 없기 때문에 센서를 성공적으로 완료 할 수 없기 때문입니다).

두 번째

:

나는 당신의 작업을 수행하는 일을 무엇인지 잘 모르겠지만, 예을 위해의 그들이 데이터베이스에 기록 포함 가정 해 봅시다. DagA.TimeA.T3이 성공적으로 완료되었다는 증거를 데이터베이스에서 찾는 연산자를 만듭니다.

내가 말했듯이, 귀하의 작업이 무엇을하고 있는지 알지 못하면이 운영자의 모습을 구체적으로 조언하기가 어렵습니다. 유스 케이스에 데이터베이스 쓰기 횟수가 일정한 경우 쿼리를 수행하여 대상 테이블 WHERE TIME <= NOW - 1 HOUR에있는 문서 수를 계산할 수 있습니다.