2017-12-24 27 views
5

공기 흐름에서 나는 job_flow_id을 내 emr-steps 중 하나에 전달해야한다는 문제에 직면하고 있습니다. 운영자로부터 job_flow_id을 검색 할 수 있지만 클러스터에 제출할 단계를 만들 때 task_instance 값이 올바르지 않습니다.공기 흐름 - EMR 운영자의 작업 인스턴스

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

문제는 내가 대신 load_data 단계에서 --cluster-id j-1234을 보는의 EMR을 확인할 때, 나는 --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"를 볼 수 있다는 것입니다, 내 단계가 실패합니다 : 나는 다음과 같은 코드가 있습니다.

스텝 기능 내에서 실제 값을 얻으려면 어떻게해야합니까?

감사하고 행복한 휴일

+0

따옴표없이 값을 추가하려고 시도 했습니까? –

+0

어디서'''task_instance'''를 얻을 수 있습니까? (load_data_steps [0] [ "HadoopJarStep"] [ "Args"]. 에서 물체? 나는 그것을 사용하는 방법을 아직도 배우고있다. – davideberdin

답변

3

나는 공기 흐름 저장소에 대한 this에 PR이 있음을 발견했다. 문제는 EmrAddStepsOperator의 단계에 대한 템플릿이 없다는 것입니다.

  • 만든 EmrAddStepsOperator
  • 추가 된이 연산자에서 플러그인으로 상속 사용자 정의 연산자는
  • 호출 내 DAG에서 새로 운영자가 여기에

파일이 문제를 극복하기 위해, 나는 다음과 같은했다 사용자 지정 연산자 및 파일의 코드 custom_emr_add_step_operator.py (아래 트리 참조)

from __future__ import division, absolute_import, print_function 

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils import apply_defaults 

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 


class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 

    @apply_defaults 
    def __init__(
      self, 
      *args, **kwargs): 
     super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     super(CustomEmrAddStepsOperator, self).execute(context=context) 


# Defining the plugin class 
class CustomPlugin(AirflowPlugin): 
    name = "custom_plugin" 
    operators = [CustomEmrAddStepsOperator] 
내 DAG 파일에서

나는이 방법

from airflow.operators import CustomEmrAddStepsOperator 
의 플러그인을 호출

내 프로젝트 및 플러그인의 구조는 다음과 같습니다

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

당신이 그런 PyCharm 같은 IDE를 사용하는 경우,이 의지 그것이 모듈을 찾을 수 없다고 말하기 때문에 불평한다. 그러나 Airflow를 실행하면이 문제가 나타나지 않습니다. airflow.cfg에서 오른쪽 plugins 폴더를 가리키고 있기 때문에 Airflow가 새로 생성 된 플러그인을 읽을 수 있습니다.