Subdags are python files where you can define python tasks and you can call them into a new full fledged dags in the dags folder. Subdags are found in the dags/subdags folder with python name.
First>> create a folder inside the 'dags' folder with the name subdags.
Second>>> create another file in that folder called 'subdag_downloads.py. however, you can give it any name ... because subdags are also python file. Follow the following templates.
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_downloads(parent_dag_id, child_dag_id, args):
with DAG(f"{parent_dag_id}.{child_dag_id}",
start_date=args['start_date'],
schedule_interval=args['schedule_interval'],
catchup=args['catchup']) as dag:
download_a=BashOperator(
task_id='download_a',
bash_command='sleep 10'
)
download_b=BashOperator(
task_id='download_b',
bash_command='sleep 10'
)
download_c=BashOperator(
task_id='download_c',
bash_command='sleep 10'
)
return dag
Third>>> create another dag in the main dags folder and lets define it in a file called 'group_dag.py'. In the file, follow the following script.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
from subdags.subdag_downloads import subdag_downloads
with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}
downloads=SubDagOperator(
task_id='downloads',
subdag=subdag_downloads(dag.dag_id, 'downloads', args))
#here 'downloads' is the task id also the child_dag_id
check_files = BashOperator(
task_id = 'check_files',
bash_command='sleep 10'
)
transform_a=BashOperator(
task_id='transform_a',
bash_command='sleep 10'
)
transform_b=BashOperator(
task_id='transform_b',
bash_command='sleep 10'
)
transform_c=BashOperator(
task_id='transform_c',
bash_command='sleep 10'
)
downloads >> check_files >> [transform_a, transform_b, transform_c]
The dag.dag_id
refers to the unique identifier or name of the parent DAG (group_dag
in this case).Passing dag.dag_id
to the subdag function provides a way to reference the parent DAG's identifier.
This is how it looks like now.
But we also want to group the transform. This is how we did. However, we had to create a subdag_transforms.py and then call it from inside the group_dag.py. Paste in the followin code in the subdag_transforms.py filefrom airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_transforms(parent_dag_id, child_dag_id, args):
with DAG(f"{parent_dag_id}.{child_dag_id}",
start_date=args['start_date'],
schedule_interval=args['schedule_interval'],
catchup=args['catchup']) as dag:
transform_a=BashOperator(
task_id='transform_a',
bash_command= 'sleep 10'
)
transform_b=BashOperator(
task_id='transform_b',
bash_command= 'sleep 10'
)
transform_c=BashOperator(
task_id='transform_c',
bash_command= 'sleep 10'
)
return dag
group_dag.py has to be changed.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
from subdags.subdag_downloads import subdag_downloads
from subdags.subdag_transforms import subdag_transforms
with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
args={'start_date':dag.start_date, 'schedule_interval':dag.schedule_interval, 'catchup':dag.catchup}
downloads=SubDagOperator(
task_id='downloads',
subdag=subdag_downloads(dag.dag_id, 'downloads', args))
#here 'downloads' is the task id also the child_dag_id
check_files = BashOperator(
task_id = 'check_files',
bash_command='sleep 10'
)
transforms=SubDagOperator(
task_id='transforms',
subdag=subdag_transforms(dag.dag_id, 'transforms', args)
)
downloads >> check_files >> transforms
yeah thats it. you will have the following workflow.
No comments:
Post a Comment