2017-11-03 22 views
2

나는 여러 파일을 복사하는 DAG (DAG1)가 있습니다. 그런 다음 복사 된 각 파일에 대해 다른 DAG (DAG2)를 시작하려고합니다. DAG1이 실행될 때마다 복사되는 파일 수가 달라 지므로 파일을 반복 재생하고 DAG2에 적절한 매개 변수를 호출하고 싶습니다.TriggerDagRunOperator와 함께 다른 DAG를 여러 번 실행하십시오.

예 :

with DAG('DAG1', 
     description="copy files over", 
     schedule_interval="* * * * *", 
     max_active_runs=1 
    ) as dag: 


    t_rsync = RsyncOperator(task_id='rsync_data', 
     source='/source/', 
     target='/destination/') 

    t_trigger_preprocessing = TriggerDagRunOperator(task_id='trigger_preprocessing', 
     trigger_daq_id='DAG2', 
     python_callable=trigger 

    ) 

    t_rsync >> t_trigger_preprocessing 

제가 t_rsync DAG2을 XCOM로부터 해당 데이터를 가져와 다음 트리거 python_callable trigger를 사용하기를 바라고시켰다 그러나 나에게이 일을하는 방법이 분명하지 않다.

내가

답변

1

내 자신의 운영자 쓰기 결국 (또한 max_active_runs와 회로도를 스태킹 제공) DAG2의 내용을 단순화하기 위해 여기 DAG2를 호출의 논리를 넣어 선호 :와

class TriggerMultipleDagRunOperator(TriggerDagRunOperator): 
    def execute(self, context): 
     count = 0 
     for dro in self.python_callable(context): 
      if dro: 
       with create_session() as session: 
        dbag = DagBag(settings.DAGS_FOLDER) 
        trigger_dag = dbag.get_dag(self.trigger_dag_id) 
        dr = trigger_dag.create_dagrun(
         run_id=dro.run_id, 
         state=State.RUNNING, 
         conf=dro.payload, 
         external_trigger=True) 
        session.add(dr) 
        session.commit() 
        count = count + 1 
      else: 
       self.log.info("Criteria not met, moving on") 
     if count == 0: 
      raise AirflowSkipException('No external dags triggered') 

: 다음

def trigger_preprocessing(context): 
    for base_filename,_ in found.items(): 
     exp = context['ti'].xcom_pull(task_ids='parse_config', key='experiment') 
     run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat()) 
     dro = DagRunOrder(run_id=run_id) 
     d = { 
      'directory': context['ti'].xcom_pull(task_ids='parse_config', key='experiment_directory'), 
      'base': base_filename, 
      'experiment': exp['name'], 
     } 
     LOG.info('triggering dag %s with %s' % (run_id,d)) 
     dro.payload = d 
     yield dro 
    return 

과 같은 python_callable이와 함께 모든 넥타이

t_trigger_preprocessing = TriggerMultipleDagRunOperator(task_id='trigger_preprocessing', 
    trigger_dag_id='preprocessing', 
    python_callable=trigger_preprocessing 
)