1>> I created a file in project_folder>dags>xcom_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1():
return 42
def _t2():
None
with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
t1=PythonOperator(
task_id='t1',
python_callable= _t1
)
t2=PythonOperator(
task_id='t2',
python_callable= _t2
)
t3=BashOperator(
task_id='t3',
bash_command= "echo ''"
)
t1 >> t2 >> t3
this was the first way of pushing a xcom. Now, lets have a look at the second way of pushing an xcom
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1(ti):
ti.xcom_push(key='my_key', value=42)
def _t2():
None
with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
t1=PythonOperator(
task_id='t1',
python_callable= _t1
)
t2=PythonOperator(
task_id='t2',
python_callable= _t2
)
t3=BashOperator(
task_id='t3',
bash_command= "echo ''"
)
t1 >> t2 >> t3
In summary, the _t1(ti)
function pushes the value 42
with the key 'my_key'
to the XCom system for the specific task instance. This allows other tasks downstream in the DAG to access this value using the same key through the XCom system. The data pushed via XCom can be used for sharing information or transferring small amounts of data between tasks within the same DAG.
xcom_pull(key='my_key', task_ids='t1') is used to fetch the data from the previous push. Here is the full code below.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1(ti):
ti.xcom_push(key='my_key', value=42)
def _t2(ti):
print(ti.xcom_pull(key='my_key', task_ids='t1'))
with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
t1=PythonOperator(
task_id='t1',
python_callable= _t1
)
t2=PythonOperator(
task_id='t2',
python_callable= _t2
)
t3=BashOperator(
task_id='t3',
bash_command= "echo ''"
)
t1 >> t2 >> t3
No comments:
Post a Comment