2017-02-03 2 views
0

공기 흐름 BashOperator에 명령 줄 인수를 전달하는 방법이 있습니까? 현재, 날짜 인수를 받아 특정 날짜보다 오래된 특정 폴더를 정리하는 것과 같은 특정 작업을 수행하는 python 스크립트가 있습니다. 하나의 작업을 단순화 된 코드에서공기 흐름에 명령 줄 인수 전달 BashOperator

은, 내가 뭘하고 싶은 사전에

from __future__ import print_function 
from airflow.operators import BashOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 

default_args = { 
    'owner'    : 'airflow' 
    ,'depends_on_past' : False 
    ,'start_date'  : datetime(2017, 01, 18) 
    ,'email'   : ['[email protected]'] 
    ,'retries'   : 1 
    ,'retry_delay'  : timedelta(minutes=5) 
} 

dag = DAG(
    dag_id='data_dir_cleanup' 
    ,default_args=default_args 
    ,schedule_interval='0 13 * * *' 
    ,dagrun_timeout=timedelta(minutes=10) 
    ) 

cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log' 
     #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line) 
     #,env=env 
     ,dag=dag 
    ) 

덕분이다

답변

0

시도 :

os.system을 ("당신이 여기"명령)

0

BashOperator는 Jinja2로 템플릿 화되어있어 임의의 값을 전달할 수 있습니다.

cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command="python cleanup.py --date {{ DATE }} 2>&1 >> /tmp/airflow/data_dir_cleanup.log" 
     ,params = {'DATE' : 'this-should-be-a-date'} 
     ,dag=dag 
    ) 

도 참조 : https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja 넓은 예를 들어, 귀하의 경우에는이 같은 것을 할 것이다.

0

BashOperator는 Jinja 템플릿이 적용되어 있으므로 params를 사전으로 전달할 수 있습니다.

에어 플로가 작업을 예약하고 param을 묻지 않습니다. "특정 날짜를 명령 줄 매개 변수로 전달해야합니다."라고 말할 때 불가능합니다. 공기 흐름은 DAG가 실행되도록 예약 및 그 BashOperator PARAMS 사용하여 매크로 {{DS}} 또는 {{ds_nodash}} (https://airflow.incubator.apache.org/code.html#macros)

env = {} 
env['DATE'] = '{{ ds }}' 
cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log' 
     ,params=env 
     ,dag=dag 
    ) 
에 전달 될 수있는 날짜입니다 실행 DATE의 개념을 가지고 있지만

는 "DATE"이 PARAM이 스크립트를 비난하고

+0

이 솔루션을 시도했지만 DS 렌더링되지 않았습니다. ds param 전달할 수있는 방법을 찾을 수 없습니다 !! – Omar14

0

다음과 같은 시도 할 수 있습니다 $ 날짜와 다른 bash는 변수로 사용할 수 있습니다 전달 될 것이다 (나를 위해 일한) :

cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'" 

t = BashOperator(
    task_id = 'some_id', 
    bash_command = cmd_command, 
    dag = your_dag_object_name) 

내가 그렇게했을 때 변수를 렌더링하고 잘 작동했습니다. 나는 그것이 모든 변수에서 작동한다고 믿는다. (.py 스크립트를 실행하기 때문에 내 명령의 시작 부분에 'python'이라는 단어를 넣었다.)

내 작업이 이들 변수를 읽으려면 제대로 작성되어야한다. 명령 줄 인수 (sys.argv 특성).