Sunday, 4 June 2023

Creating and Using subdags in airflow

Subdags are python files where you can define python tasks and you can call them into a new full fledged dags in the dags folder. Subdags are found in the dags/subdags folder with python name. 


 First>> create a folder inside the 'dags' folder with the name subdags.

Second>>> create another file in that folder called 'subdag_downloads.py. however, you can give it any name ... because subdags are also python file. Follow the following templates. 

from airflow import DAG
from airflow.operators.bash import BashOperator

def subdag_downloads(parent_dag_id, child_dag_id, args):
    with DAG(f"{parent_dag_id}.{child_dag_id}",
             start_date=args['start_date'],
             schedule_interval=args['schedule_interval'],
             catchup=args['catchup']) as dag:
       
         download_a=BashOperator(
             task_id='download_a',
             bash_command='sleep 10'
         )
         download_b=BashOperator(
             task_id='download_b',
             bash_command='sleep 10'
         )
         download_c=BashOperator(
             task_id='download_c',
             bash_command='sleep 10'
         )
         return dag

Third>>> create another dag in the main dags folder and lets define it in a file called 'group_dag.py'. In the file, follow the following script. 

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
from subdags.subdag_downloads import subdag_downloads

with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
         args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}

         downloads=SubDagOperator(
                 task_id='downloads',
                 subdag=subdag_downloads(dag.dag_id, 'downloads', args))
          #here 'downloads' is the task id also the child_dag_id      

         check_files = BashOperator(
             task_id = 'check_files',
             bash_command='sleep 10'
         )

         transform_a=BashOperator(
             task_id='transform_a',
             bash_command='sleep 10'
         )
         transform_b=BashOperator(
             task_id='transform_b',
             bash_command='sleep 10'
         )
         transform_c=BashOperator(
             task_id='transform_c',
             bash_command='sleep 10'
         )

         downloads >> check_files >> [transform_a, transform_b, transform_c]

The dag.dag_id refers to the unique identifier or name of the parent DAG (group_dag in this case).Passing dag.dag_id to the subdag function provides a way to reference the parent DAG's identifier.

This is how it looks like now.


But we also want to group the transform. This is how we did. However, we had to create a subdag_transforms.py and then call it from inside the group_dag.py. Paste in the followin code in the subdag_transforms.py file

from airflow import DAG
from airflow.operators.bash import BashOperator

def subdag_transforms(parent_dag_id, child_dag_id, args):
    with DAG(f"{parent_dag_id}.{child_dag_id}",
             start_date=args['start_date'],
             schedule_interval=args['schedule_interval'],
             catchup=args['catchup']) as dag:
       
        transform_a=BashOperator(
            task_id='transform_a',
            bash_command= 'sleep 10'
        )
        transform_b=BashOperator(
            task_id='transform_b',
            bash_command= 'sleep 10'
        )
        transform_c=BashOperator(
            task_id='transform_c',
            bash_command= 'sleep 10'
        )
        return dag

group_dag.py has to be changed.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
from subdags.subdag_downloads import subdag_downloads
from subdags.subdag_transforms import subdag_transforms

with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
         args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}

         downloads=SubDagOperator(
                 task_id='downloads',
                 subdag=subdag_downloads(dag.dag_id, 'downloads', args))
         #here 'downloads' is the task id also the child_dag_id
       

         check_files = BashOperator(
             task_id = 'check_files',
             bash_command='sleep 10'
         )

         transforms=SubDagOperator(
             task_id='transforms',
             subdag=subdag_transforms(dag.dag_id, 'transforms', args)
         )


         downloads >> check_files >> transforms

yeah thats it. you will have the following workflow. 




































No comments:

Post a Comment