2017-10-18 7 views
1

ExternalTaskSensor를 사용하려고하는데 이미 성공적으로 완료된 다른 DAG의 작업을 파킹하는 경우가 발생합니다.기류 ExternalTaskSensor가 꽂히는 경우

여기서 첫 번째 DAG "a"가 작업을 완료 한 후 ExternalTaskSensor를 통해 두 번째 DAG "b"가 트리거 된 것으로 가정합니다. 대신 a.first_task를 파고 들기 시작합니다.

먼저 DAG :

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 

dag = DAG(
    dag_id='a', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_first_task(): 
    print('First task is done') 

PythonOperator(
    task_id='first_task', 
    python_callable=do_first_task, 
    dag=dag) 

두 번째 DAG : 내가 무슨

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
from airflow.operators.sensors import ExternalTaskSensor 

dag = DAG(
    dag_id='b', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_second_task(): 
    print('Second task is done') 

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed', 
    external_dag_id='a', 
    external_task_id='first_task', 
    dag=dag) >> \ 
PythonOperator(
    task_id='second_task', 
    python_callable=do_second_task, 
    dag=dag) 

를 놓친 거지?

답변

1

ExternalTaskSensor은 동일한 실행 날짜로 실행되는 dag의 작업에 의존한다고 가정합니다.

즉, ab의 데이터는 동일한 일정 (예 : 매일 오전 9시 또는 w/e)으로 실행해야합니다.

그렇지 않으면 ExternalTaskSensor을 인스턴스화 할 때 execution_delta 또는 execution_date_fn을 사용해야합니다. 여기

더욱 명확히 돕는 오퍼레이터 자체 내부 문서이다

:param execution_delta: time difference with the previous execution to 
    look at, the default is the same execution_date as the current task. 
    For yesterday, use [positive!] datetime.timedelta(days=1). Either 
    execution_delta or execution_date_fn can be passed to 
    ExternalTaskSensor, but not both. 

:type execution_delta: datetime.timedelta 


:param execution_date_fn: function that receives the current execution date 
    and returns the desired execution date to query. Either execution_delta 
    or execution_date_fn can be passed to ExternalTaskSensor, but not both. 

:type execution_date_fn: callable