from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1(ti):
ti.xcom_push(key='my_key', value=42)
def _t2(ti):
print(ti.xcom_pull(key='my_key', task_ids='t1'))
def _branch(ti):
value=ti.xcom_pull(key='my_key', task_ids='t1')
if (value == 42):
return 't2' # if the condition meets, we go to the next task, or else we skip
return 't3' # we are skipping to the next task at hand.
with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
t1=PythonOperator(
task_id='t1',
python_callable= _t1
)
branch=BranchPythonOperator(
task_id='branch',
python_callable=_branch
)
t2=PythonOperator(
task_id='t2',
python_callable= _t2
)
t3=BashOperator(
task_id='t3',
bash_command= "echo ''"
)
t4=BashOperator(
task_id='t4',
bash_command="echo ''",
trigger_rule='none_failed_min_one_success'
)
t1 >> branch >> [t2,t3] >> t4
More about trigger_rule?
go here: Airflow Trigger Rules: All you need to know! - Marc Lamberti
No comments:
Post a Comment