2017-11-28 13 views
0

depends_on_past을 DagRun 전체에 사용하는 기류가 작업에만 적용되는 것이 아닌가요?전체 DAG에 대한 공기 흐름 depends_on_past

매일 DAG가 있고 금요일 DagRun은 4 번째 작업에서 오류가 발생하지만 토요일과 일요일 DagRuns는 여전히 예정대로 실행되었습니다. depends_on_past = True을 사용하면 동일한 네 번째 작업에서 DagRun이 일시 중지되지만 처음 세 작업은 계속 실행됩니다.

DagRun DB 테이블에는 금요일 DagRun에 failed이 포함 된 state 열이 있습니다. 내가 원했던 것은 이전 DagRun이 실패한 경우 시작하지 않고 이전에 실패한 작업을 찾을 때까지 실행하지 않도록 DagRun을 구성하는 방법입니다.

가능하면 누구에게 알리십니까?

답변

0

한 가지 가능한 솔루션은 xcom을 사용하는 것입니다 :

  1. 는 DAG 2 PythonOperators start_taskend_task를 추가합니다.
  2. 다른 모든 작업 end_task 다른 모든 작업 (set_upstream)에 따라 확인 start_task
  3. 에 의존합니다.
  4. end_task은 항상 변수 last_success = context['execution_date']을 xcom()으로 푸시합니다. (PythonOperators에서 provide_context = True 필요).
  5. 그리고 start_task는 항상 이전 DagRun의 execution_date에 또는 DAG의 시작일과 동일한 값을 가진 last_success 변수가 존재 여부를 확인하기 위해 XCOM (xcom_pull)를 확인합니다 (프로세스 시작을 할 수 있습니다). XCOM의

사용 예 :
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py