Tuesday, 6 June 2023

Using xcoms of Airflow

 1>> I created a file in project_folder>dags>xcom_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1():
    return 42

def _t2():
    None

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3

this was the first way of pushing a xcom. Now, lets have a look at the second way of pushing an xcom 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=42)

def _t2():
    None

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3

In summary, the _t1(ti) function pushes the value 42 with the key 'my_key' to the XCom system for the specific task instance. This allows other tasks downstream in the DAG to access this value using the same key through the XCom system. The data pushed via XCom can be used for sharing information or transferring small amounts of data between tasks within the same DAG.

xcom_pull(key='my_key', task_ids='t1') is used to fetch the data from the previous push. Here is the full code below. 


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=42)

def _t2(ti):
    print(ti.xcom_pull(key='my_key', task_ids='t1'))

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3











No comments:

Post a Comment