2016-11-22 1 views
2

파이썬 함수에서 default_args start_date를 참조 할 수 있습니까?공기 흐름 ETL 파이프 라인 - 기능에서 일정 날짜 사용?

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2016, 11, 21), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=1) 
} 

내 파이썬 스크립트는 주로이 성명을 발표하는 하위 프로세스를 사용

query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database,                select_database(database)[table_int], 
                     query_date) 
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename)) 

내가 '어디 날짜 = {} 테이블에서 선택 *'를 쿼리 BCP를 사용하고 실행하려는 작업. 현재 파이썬 스크립트는 날짜 변수에 대한 모든 로직을 가지고 있습니다 (기본값은 어제 임). 그러나 대신 default_arg를 참조하고 기류가 날짜를 처리하도록하는 것이 좋습니다.

간단히하기 위해 default_arg start_date를 사용하고 일정 (매일 실행)을 사용하여 BCP 명령에서 변수를 채우고 싶습니다. 이게 올바른 접근인가 아니면 파이썬 스크립트에 날짜 논리를 유지해야합니까?

답변

3

이것은 올바른 접근 방법이지만 실제로 필요한 것은 이 아니라 start_date입니다. provide_context=True 매개 변수를 사용하여 execution_date을 PythonOperator의 컨텍스트를 통해 'ds' 기본 변수로 가져올 수 있습니다. provide_context=True 매개 변수는 Jinja 템플릿에서 사용 된 기본 변수 집합을 kwargs 인수로 전달합니다. 문서의 관련 섹션에서 Default Variables 및 Jinja Templating에 대한 자세한 내용을 볼 수 있습니다. ,

def query_db(**kwargs): 
    #get execution date in format YYYY-MM-DD 
    query_date = kwargs.get('ds') 

    #rest of your logic 

t_query_db = PythonOperator( task_id='query_db', python_callable=query_db, provide_context=True, dag=dag)

+0

쿨 날이 진짜 빨리 놀러하자 https://airflow.incubator.apache.org/code.html#default-variables https://airflow.incubator.apache.org/concepts.html#jinja-templating

코드는 다음과 같이한다. 나는 처음에는 혼란 스러웠다. 그러나 'ds'와 'yesterday_ds'등을 보여주는 API 문서에서 매크로 섹션을 발견했다. – trench

+0

예, 실제로는 매우 혼란 스럽기 때문에 분명히해야했다. 나는 대답을 업데이트 했으므로 조금만 더 잘 설명하면된다. –