Monday, 5 June 2023

GroupTasks in airflow! New way of organizing subtasks.

 




1>>  group_downloads.py : groups folder has a file called group_downloads.py. Here is how it looks: 

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup


def download_tasks():
         with TaskGroup("downloads", tooltip="Download tasks") as group:

            download_a=BashOperator(
            task_id='download_a',
            bash_command='sleep 10'
            )
            download_b=BashOperator(
            task_id='download_b',
            bash_command='sleep 10'
            )
            download_c=BashOperator(
            task_id='download_c',
            bash_command='sleep 10'
            )
            return group

The tooltip parameter in the TaskGroup constructor is used to provide an optional tooltip description for the task group.

A tooltip is a small pop-up box that appears when you hover over an element, such as a task group, in the Airflow user interface (UI). It can be helpful to provide additional information or context about the task group.

In the given code snippet, the tooltip="Download tasks" assigns the tooltip description "Download tasks" to the task group named "downloads". This tooltip will be displayed when a user hovers over the task group in the Airflow UI, allowing them to quickly understand the purpose or content of the tasks grouped under "downloads".

Adding tooltips can be useful for enhancing the usability and clarity of your Airflow UI, especially when you have multiple task groups or complex workflows.

2 >> We will be working on the group_dag.py file and we do not need the SubDagOperator() anymore. Need to define the new group tasks, need to import the group tasks from the groups folder.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from groups.group_downloads import download_tasks

with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
         args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}

         downloads=download_tasks()
         #here 'downloads' is the task id also the child_dag_id
       

         check_files = BashOperator(
             task_id = 'check_files',
             bash_command='sleep 10'
         )

         #transforms=transform_tasks()


         downloads >> check_files #>> transforms


Lets do the same for group_transforms.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup


def transform_tasks():
    with TaskGroup("transforms", tooltip="Transform tasks") as group:
       
        transform_a=BashOperator(
            task_id='transform_a',
            bash_command= 'sleep 10'
        )
        transform_b=BashOperator(
            task_id='transform_b',
            bash_command= 'sleep 10'
        )
        transform_c=BashOperator(
            task_id='transform_c',
            bash_command= 'sleep 10'
        )
        return group

Let's change the group_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from groups.group_downloads import download_tasks
from groups.group_transforms import transform_tasks

with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
         args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}

         downloads=download_tasks()
         #here 'downloads' is the task id also the child_dag_id
       

         check_files = BashOperator(
             task_id = 'check_files',
             bash_command='sleep 10'
         )

         transforms=transform_tasks()


         downloads >> check_files >> transforms

this is how your graph is going to look like









No comments:

Post a Comment