Saturday 16 December 2023

DAX Query and Modelling Tut 1

 This is something very interesting,

We can to create a table that is called calendar in power bi. Now do we do that?
Yes, you are write, we will DAX. 

Year = DISTINCT(
    SELECTCOLUMNS(
        CALENDAR(DATE(1961, 1, 1), TODAY()),
        "Year", YEAR([Date])
    )
)


Another intuitive way of getting calendar is this:
EVALUATE
CALENDAR(DATE(2023 - 60, 1, 1), today())

Now we will create another calculated column in the same year table and we will call it millenium.
Millenium = 'Year'[Year] - MOD('Year'[Year], 1000) What does the MOD represent here? MOD stands for Modulus here. Modulus is whatever is left after attempt of division. In this case, the division result cannot be of any other data type but a whole number.


Century = 'year'[Year] - MOD('Year'[Year], 100)

We want to test the DAX Measures, you can use the DAX Query View in order to test measures or calculated columns.

EVALUATE
   SUMMARIZECOLUMNS("whatever", MOD(5, 2))

Now, lets create a Decade column. Put your code in the comment below.


))

Monday 17 July 2023

Friday 16 June 2023

Filtering using pandas


This code will filter the rows where the 'Year' column is 2017 and select only the 'Year' and 'Revenue (GBP Millions)' columns. It will print the filtered rows with the corresponding values for both columns.

Make sure to replace 'Year' and 'Revenue (GBP Millions)' with the actual column names in your dataframe. If you have more columns that you want to include in the filtered result, you can add them within the double square brackets, separated by commas.

Note: If you want to perform any calculations or further data analysis on the filtered data, it's recommended to assign it to a new variable, like filtered_data, rather than modifying the original dataframe directly.

filtered_data = df[df['Year'] == 2017][['Year', 'Revenue (GBP Millions)']]

print(filtered_data)


Scalar Subquery in Case When SQL

 Lets first create a table:

Create database blog_port;

use blog_portal;

-- Create the Employee table

CREATE TABLE Employee (

    employee_id INT,

    team_id INT,

    PRIMARY KEY (employee_id)

);


-- Insert the data into the Employee table

INSERT INTO Employee (employee_id, team_id)

VALUES (1, 8),

       (2, 8),

       (3, 8),

       (4, 7),

       (5, 9),

       (6, 9);

Write an SQL query to find the team size of each of the employees. Return result table in any order. The query result format is in the following example.

First intuitive way to solve the problem:

with CTE as (Select 

team_id, count(team_id) as team_mem_count

from Employee

group by team_id)


select e.employee_id, CTE.team_mem_count

from Employee as e

Right join CTE

on CTE.team_id = e.team_id

order by e.employee_id

;

Best Way: Case When Scalar Grouping

SELECT employee_id,

       team_id,

       CASE

           WHEN team_id IS NULL THEN 0

           ELSE (SELECT COUNT(*) FROM Employee WHERE team_id = e.team_id)

       END AS team_size

FROM Employee AS e;


In this case, the subquery acts as a scalar subquery, 

which can be used in the SELECT statement without requiring a GROUP BY clause. 

The subquery is evaluated independently for each row, 

providing the corresponding team_size for each employee without the need for explicit grouping.

For example, when the subquery conditions run - WHERE team_id = e.team_id it will only match the team id of the current row to the team ids of the employee as e table. Therefore, keep in mind, each row will return the count(*) for only the matching rows values to the condition. 




Sunday 11 June 2023

Importing CSV into mysql

 


conda install -c anaconda sqlalchemy
Pip install pandas


 Way 1:

from sqlalchemy import create_engine as ce
mysqlengine= ce("mysql+mysqldb://root:root@127.0.0.1:3306/cancer_data")

import pandas as pd

df = pd.read_csv("D:/DownloadsD/new/health_statistics.csv", header=None)

df=df.iloc[1:]
#print(df.columns)


df.to_sql('health_statistics', mysqlengine, if_exists='append', index= False)


Way 2: 

import MySQLdb as m

# Connect to the MySQL database
mysql_connection = m.connect(
    user="root",
    password="root",
    database="cancer_data"
)

# Create a cursor object to execute SQL queries
cursor = mysql_connection.cursor()

# Path to your CSV file
csv_file = 'C:/ProgramData/MySQL/MySQL Server 8.0/Uploads/health_statistics.csv'

# Name of the table in the database
table_name = 'health_statistics'

# SQL query to load data from CSV into the table
load_query = f"""
    LOAD DATA INFILE '{csv_file}'
    INTO TABLE {table_name}
    FIELDS TERMINATED BY ','
    ENCLOSED BY '"'
    LINES TERMINATED BY '\n'
    IGNORE 1 ROWS
"""

# Execute the query
cursor.execute(load_query)

# Commit the changes
mysql_connection.commit()

# Close the cursor and database connection
cursor.close()
mysql_connection.close()

Way 3: 

1 . Put the data in the uploads folder of the mysql programdata file

2. Create the table

3. use the following code:

    LOAD DATA INFILE '{csv_file_path}'
    INTO TABLE {table_name}
    FIELDS TERMINATED BY ','
    ENCLOSED BY '"'
    LINES TERMINATED BY '\n'
    IGNORE 1 ROWS

Way 4: (When Previous use cases fail)

This is similar to the first one. But, one thing to remember is, this step needs to be added to the first Way if the CSV file column names are different. There are multiple ways of dealing with it. We can go to the CSV file and change the columns one by one. If it is not possible to do it everyday, then we will be using the following piece of code. 

"""
column_mapping = {
    0: 'column1_name',
    1: 'column2_name',
    # Add more mappings as needed for the specific columns
}

# Rename the columns in the DataFrame using the mapping dictionary
df = df.rename(columns=column_mapping)

""" 

Happy Coding. 










Wednesday 7 June 2023

Tuesday 6 June 2023

ElasticSearch with Airflow, Creating plugins, airflow operators, hooks by myself

 

docker-compose-es.yaml

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.3.0
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.3.2}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.3.3
    environment:
      - "xpack.security.enabled=false"
      - "discovery.type=single-node"
      - "ES_JAVA_OPTS=-Xms750m -Xmx750m"
    ports:
      - 9200:9200

  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:0"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:


docker-compose -f docker-compose-es.yaml up -d

docker-compose -f docker-compose-es.yaml ps

docker exec -it materials-airflow-scheduler-1 /bin/bash

curl -X GET 'http://elastic:9200'

Lets create a plugin

from airflow.plugins_manager import AirflowPlugin
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch


class ElasticHook(BaseHook):
    #naming our hook here and giving it the BaseHook attributes
    def __init__(self, conn_id='elastic_default', *args, **kwargs):
        # connection id corresponds to the connection of Elastic search we created before
        super().__init__(*args, **kwargs) # initialising the conneciton
        conn=self.get_connection(conn_id)
        # we are getting the connection from the airflow meta database
        # this connection connects to Elasticsearch
        # we are going to grab some information from it

        conn_config={}
        hosts = []

        # let's verify if we have a host in our connection,
        if conn.host:
            hosts = conn.host.split(',')
            #split incase we have multiple hosts
        if conn.port:
            conn_config['port']=int(conn.port)
       
        # we are going to pull in the hosts and store inside the hosts list
        # we check if we have port, then like we assign key to dictionary, we are
        # assigning values after creating a port key in conn_config dict.

        if conn.login:
            conn_config['http_auth'] = (conn.login, conn.password)

        self.es = Elasticsearch(hosts, **conn_config) # passing in values to Elasticsearch
        self.index = conn.schema
        # we initialised the ElasticHook
        # function is executed to return the result

    def info(self):
        return self.es.info()
    # returns the information about the elastic search intance

    def set_index(self, index):
        self.index = index
# set the index, folder where all the data are
    def add_doc(self, index, doc_type, doc):
        self.set_index(index)
        res=self.es.index(index=index, doc_type=doc_type, doc=doc)
        return res
#  to add data into elastic search for a specific index

class AirflowElasticPlugin(AirflowPlugin):
    name = 'elastic'
    hooks = [ElasticHook]
# this helps to add the plugin to the plugin system manager
# you can change the name of the class here but must inherit from AirflowPlugin

docker-compose -f docker-compose-es.yaml ps

docker exec -it materials-airflow-scheduler-1 /bin/bash

docker-compose -f docker-compose-es.yaml stop

docker-compose -f docker-compose-es.yaml up -d

docker exec -it materials-airflow-scheduler-1 /bin/bash

airflow plugins  # this command should show you elastic plugins and all the other plugins we have created so far. 


Calling the hook inside the dag we created in dags folder naming it elastic_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from hooks.elastic.elastic_hook import ElasticHook
# imports the ElasticHook from the subfolder of the plugins.
from datetime import datetime

def _print_es_info():
    hook = ElasticHook()
    print(hook.info())

with DAG('elastic_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
   
    print_es_info=PythonOperator(
        task_id='print_es_info',
        python_callable=_print_es_info
    )

You can trigger the dag> task > log to see the output








Branch, conditional task execution in airflow, BranchPythonOperator, trigger_rule

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=42)

def _t2(ti):
    print(ti.xcom_pull(key='my_key', task_ids='t1'))

def _branch(ti):
    value=ti.xcom_pull(key='my_key', task_ids='t1')
    if (value == 42):
        return 't2' # if the condition meets, we go to the next task, or else we skip
    return 't3' # we are skipping to the next task at hand.

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )

    branch=BranchPythonOperator(
        task_id='branch',
        python_callable=_branch
    )

    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )
    t4=BashOperator(
        task_id='t4',
        bash_command="echo ''",
        trigger_rule='none_failed_min_one_success'
    )

    t1 >> branch >> [t2,t3] >> t4

More about trigger_rule? 

go here: Airflow Trigger Rules: All you need to know! - Marc Lamberti






Using xcoms of Airflow

 1>> I created a file in project_folder>dags>xcom_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1():
    return 42

def _t2():
    None

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3

this was the first way of pushing a xcom. Now, lets have a look at the second way of pushing an xcom 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=42)

def _t2():
    None

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3

In summary, the _t1(ti) function pushes the value 42 with the key 'my_key' to the XCom system for the specific task instance. This allows other tasks downstream in the DAG to access this value using the same key through the XCom system. The data pushed via XCom can be used for sharing information or transferring small amounts of data between tasks within the same DAG.

xcom_pull(key='my_key', task_ids='t1') is used to fetch the data from the previous push. Here is the full code below. 


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=42)

def _t2(ti):
    print(ti.xcom_pull(key='my_key', task_ids='t1'))

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
         schedule_interval='@daily', catchup=False) as dag:
    t1=PythonOperator(
        task_id='t1',
        python_callable= _t1
    )
    t2=PythonOperator(
        task_id='t2',
        python_callable= _t2
    )
    t3=BashOperator(
        task_id='t3',
        bash_command= "echo ''"

    )

    t1 >> t2 >> t3











Monday 5 June 2023

Basics of Apache Airflow in one piece of code

 from airflow import DAG

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


import json
from pandas import json_normalize
from datetime import datetime

def _process_user(ti):
    user=ti.xcom_pull(task_ids="extract_user")
    user=user['results'][0]
    processed_user=json_normalize(
        {
            'firstname':user['name']['first'],
            'lastname':user['name']['last'],
            'country':user['location']['country'],
            'username':user['login']['username'],
            'password':user['login']['password'],
            'email':user['email']
        }
    )
    processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)

def _store_user():
    hook=PostgresHook(postgres_conn_id='postgres')
    hook.copy_expert(
        sql="COPY users FROM stdin WITH DELIMITER as ','",
        filename='/tmp/processed_user.csv'
    )


with DAG('user_processing', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:

    create_table=PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres', #name of the connection we will create
        sql='''
            CREATE TABLE IF NOT EXISTS users(
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL,
                username TEXT NOT NULL,
                password TEXT NOT NULL,
                email TEXT NOT NULL
            );
        '''
    )
    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='user_api',
        endpoint='api/'
    )

    extract_user = SimpleHttpOperator(
        task_id='extract_user',
        http_conn_id='user_api',
        endpoint='api/',
        method='GET',
        response_filter=lambda response: json.loads(response.text),
        log_response=True
    )

    process_user = PythonOperator(
        task_id='process_user',
        python_callable=_process_user
    )

    store_user = PythonOperator(
        task_id='store_user',
        python_callable=_store_user
    )


create_table >> is_api_available >> extract_user >> process_user >> store_user