1>> group_downloads.py : groups folder has a file called group_downloads.py. Here is how it looks:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
def download_tasks():
with TaskGroup("downloads", tooltip="Download tasks") as group:
download_a=BashOperator(
task_id='download_a',
bash_command='sleep 10'
)
download_b=BashOperator(
task_id='download_b',
bash_command='sleep 10'
)
download_c=BashOperator(
task_id='download_c',
bash_command='sleep 10'
)
return group
The tooltip
parameter in the TaskGroup
constructor is used to provide an optional tooltip description for the task group.
A tooltip is a small pop-up box that appears when you hover over an element, such as a task group, in the Airflow user interface (UI). It can be helpful to provide additional information or context about the task group.
In the given code snippet, the tooltip="Download tasks"
assigns the tooltip description "Download tasks" to the task group named "downloads". This tooltip will be displayed when a user hovers over the task group in the Airflow UI, allowing them to quickly understand the purpose or content of the tasks grouped under "downloads".
Adding tooltips can be useful for enhancing the usability and clarity of your Airflow UI, especially when you have multiple task groups or complex workflows.
2 >> We will be working on the group_dag.py file and we do not need the SubDagOperator() anymore. Need to define the new group tasks, need to import the group tasks from the groups folder.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from groups.group_downloads import download_tasks
with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}
downloads=download_tasks()
#here 'downloads' is the task id also the child_dag_id
check_files = BashOperator(
task_id = 'check_files',
bash_command='sleep 10'
)
#transforms=transform_tasks()
downloads >> check_files #>> transforms
Lets do the same for group_transforms.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
def transform_tasks():
with TaskGroup("transforms", tooltip="Transform tasks") as group:
transform_a=BashOperator(
task_id='transform_a',
bash_command= 'sleep 10'
)
transform_b=BashOperator(
task_id='transform_b',
bash_command= 'sleep 10'
)
transform_c=BashOperator(
task_id='transform_c',
bash_command= 'sleep 10'
)
return group
Let's change the group_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from groups.group_downloads import download_tasks
from groups.group_transforms import transform_tasks
with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}
downloads=download_tasks()
#here 'downloads' is the task id also the child_dag_id
check_files = BashOperator(
task_id = 'check_files',
bash_command='sleep 10'
)
transforms=transform_tasks()
downloads >> check_files >> transforms
|
this is how your graph is going to look like |
No comments:
Post a Comment