공기 흐름에서 나는 job_flow_id
을 내 emr-steps 중 하나에 전달해야한다는 문제에 직면하고 있습니다. 운영자로부터 job_flow_id
을 검색 할 수 있지만 클러스터에 제출할 단계를 만들 때 task_instance
값이 올바르지 않습니다.공기 흐름 - EMR 운영자의 작업 인스턴스
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
문제는 내가 대신 load_data
단계에서 --cluster-id j-1234
을 보는의 EMR을 확인할 때, 나는 --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"
를 볼 수 있다는 것입니다, 내 단계가 실패합니다 : 나는 다음과 같은 코드가 있습니다.
스텝 기능 내에서 실제 값을 얻으려면 어떻게해야합니까?
감사하고 행복한 휴일
따옴표없이 값을 추가하려고 시도 했습니까? –
어디서'''task_instance'''를 얻을 수 있습니까? (load_data_steps [0] [ "HadoopJarStep"] [ "Args"]. 에서 물체? 나는 그것을 사용하는 방법을 아직도 배우고있다. – davideberdin