1
ExternalTaskSensor를 사용하려고하는데 이미 성공적으로 완료된 다른 DAG의 작업을 파킹하는 경우가 발생합니다.기류 ExternalTaskSensor가 꽂히는 경우
여기서 첫 번째 DAG "a"가 작업을 완료 한 후 ExternalTaskSensor를 통해 두 번째 DAG "b"가 트리거 된 것으로 가정합니다. 대신 a.first_task를 파고 들기 시작합니다.
먼저 DAG :
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
두 번째 DAG : 내가 무슨
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
를 놓친 거지?