DISCOVERY

January 17th, 2022

Learning the Basics of Apache Airflow

Airflow

Python

Over the last six months, I’ve ued Apache Airflow extensively at work. Airflow is a platform and framework for building and automating data pipelines1. Airflow data pipelines are written in Python and interoperate with many different technologies, such as databases, cloud platforms, containers, and more. Often, Airflow is used in the realms of data analytics and machine learning.

While Airflow data pipelines are written in Python, the software they automate and schedule do not need to be Python related. Nonetheless, the fact that Airflow's language is Python makes data pipelines highly configurable and customizable. Since Python is very popular and simpler to learn compared to other languages, most engineers will be able to work with Airflow easily.

There are three main objectives in this article: introducing the basic concepts of Airflow, creating an Airflow development environment, and exploring basic Airflow pipelines. The code discussed in this article is available on GitHub.

Airflow

Airflow is a platform and framework for building, automating, and scheduling data pipelines2. Data pipelines in Airflow are known as workflows or Directed Acyclic Graphs (DAGs). Airflow DAGs are configured as code using Python, and can be run ad hoc or on a schedule. The Airflow platform creates an execution and scheduling environment for DAGs, which are viewable from a web interface.

The main objective of Airflow is to run DAGs either manually or based on a schedule.

Airflow DAG

A Directed Acyclic Graph (DAG), also referred to as a data pipeline or workflow, is a graph of tasks which is run manually or based on a schedule. Since DAGs are acyclic, the graph of tasks in a DAG can’t contain any cycles, but can branch and converge as needed. Airflow DAGs are written and configured in Python. DAGs contain information such as a list of tasks, the execution order (graph) of the tasks, an execution schedule, and additional metadata.

An Airflow DAG, as seen from the Airflow UI, is shown below. This DAG contains three tasks.

Airflow Task

In Airflow, tasks are the smallest unit of execution3. Tasks are units within a DAG, with upstream and downstream dependencies. These upstream and downstream dependencies are other tasks in the DAG. Tasks can perform simple operations such as running a Python function or Bash script, or more complex operations like running a Docker container. There are many different types of tasks, which are created using Airflow Operators. Operators are templates for building tasks; for example, a PythonOperator is used to create a task that runs a Python function4.

The Airflow platform consists of multiple components; most importantly, Airflow consists of a web server, scheduler, and metastore.

Airflow Components

Web Server

In Airflow, the web server is the UI in which users can view and trigger DAGs. The web server is also helpful for viewing DAG run results and debugging DAGs by looking through execution logs.

Scheduler

The Airflow scheduler is a constantly running program that monitors all DAGs in the Airflow environment5. Python DAG files exist in a specific directory in the Airflow environment, and the scheduler is responsible for parsing these DAG files and storing information about them within the Airflow metastore. The scheduler also checks if DAGs and tasks within DAGs are eligible for execution. When tasks are eligible for execution, the scheduler places them in a queued state, and then executes them6. In many ways, the scheduler is the heart of the Airflow platform.

Metastore

The Airflow metastore holds metadata about an Airflow environment, including configuration details and DAG information. Everything that happens within the Airflow environment also exists in the metastore, including DAG run information. The metastore is a relational database, commonly MySQL or PostgreSQL. This article provides a good high-level overview of the data stored in the metastore.

After signing into the Airflow UI, the initial page displays all the DAGs in the Airflow environment.

This list of DAGs displays basic information about each DAG, such as their execution schedules, and the results of recent runs. It also gives options to toggle DAGs on and off (the switch to the left of the DAG name) and run DAGs (the play button in the "Actions" column). Clicking on a DAG shows the following view:

Airflow DAGs have multiple views; the view shown above is called the graph view. The graph view shows the DAG and the result of the previous run. In this case, both tasks ran successfully, as denoted by both tasks being outlined in green. Hovering over a task supplies more information about it, and clicking on the task provides options such as viewing the logs or re-running the task.

Clicking on the "Log" button displays the logs for the task run, which is very useful for debugging.

Another useful page is the DAGs tree view. This page shows the results of all the prior DAG runs. In the image below, the last two runs of the hello_world DAG are shown, both of which were successful.

Airflow provides many ways to view DAGs and environment configurations, but the pages shown above are the ones I’ve found most useful these past six months.

While it is possible to run Airflow on the host machine of your development environment, a more elegant approach is to use Docker. With Docker, you have an Airflow environment that works across different operating systems and is started with a single command. No Airflow dependencies are needed on your host machine with this approach. Since Airflow often has a complex setup with multiple containers, I use Docker Compose to orchestrate them.

I’ve created multiple Airflow development environments of varying degrees of complexity. The major difference between these environments comes down to the executor, which is the component of Airflow that runs scheduled tasks. The three development environments I created, which can be found in my data-analytics-prototypes repository, utilize sequential, local, and celery executors.

The simplest executor is the sequential executor, which is not recommended for production usage. Even for development use it can become a bottleneck because it runs tasks sequentially, one at a time. However, when you are just getting started, the sequential executor is likely sufficient.

The configuration for the sequential executor local environment consists of a Dockerfile and a docker-compose.yml file. The contents of these files are shown below.

# Dockerfile FROM apache/airflow:2.2.0-python3.8 RUN airflow db init \ && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org
# docker-compose.yml version: '3.8' x-environment: &airflow_environment - AIRFLOW__CORE__EXECUTOR=SequentialExecutor - AIRFLOW__CORE__LOAD_EXAMPLES=False - AIRFLOW__CORE__STORE_DAG_CODE=True - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True - _PIP_ADDITIONAL_REQUIREMENTS=apache-airflow-providers-postgres==2.3.0 services: airflow: build: dockerfile: Dockerfile context: . environment: *airflow_environment ports: - "8080:8080" volumes: - logs:/opt/airflow/logs - ../dags:/opt/airflow/dags networks: - airflow-net entrypoint: /bin/bash command: -c 'airflow webserver & airflow scheduler' volumes: logs: networks: airflow-net: driver: bridge

The Docker Compose file runs a container based on the Dockerfile. With this setup, a simple docker-compose up command from the command line will start the Airflow server.

Let’s go over these files in a bit more detail. The Dockerfile uses an official Airflow image, apache/airflow:2.2.0-python3.8, as its base image. The RUN command initializes the Airflow metastore database (airflow db init) and creates a user that can sign into the Airflow webserver (airflow users create). For the sequential executor, the Airflow metastore uses SQLite as its database engine.

The docker-compose.yml file runs a single airflow service using the Dockerfile, as specified by dockerfile: Dockerfile. The airflow service starts a container, executing a airflow webserver & airflow scheduler command. This command starts the Airflow webserver in the background and the Airflow scheduler in the foreground. The Airflow webserver is exposed on port 8080, and accessible locally at http://localhost:8080/.

There are two volumes attached to the container. Volume number one is a location to hold Airflow logs, specified by logs:/opt/airflow/logs. Volume number two is a location in my local filesystem holding Airflow DAGs, specified by ../dags:/opt/airflow/dags. The relative path from my docker-compose.yml file to my dags directory is ../dags. This dags directory is mounted on the container within /opt/airflow/dags, a directory that the Airflow scheduler reads DAGs from.

The Docker Compose setup also includes environment variables that configure Airflow. These environment variables are defined under x-environment: &airflow_environment and are attached to the container with the environment: *airflow_environment configuration. AIRFLOW__CORE__EXECUTOR sets the type of executor that Airflow uses. Setting AIRFLOW__CORE__LOAD_EXAMPLES=False tells Airflow to exclude example DAGs from the Airflow environment; Airflow loads example DAGs for reference by default. Setting AIRFLOW__CORE__STORE_DAG_CODE=True stores DAG files in the Airflow metastore and setting AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True serializes the DAGs when storing them in the metastore. AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True allows users to view the Airflow configuration from the web server. _PIP_ADDITIONAL_REQUIREMENTS is used to install additional Python dependencies in the Airflow environment. In my case, I install a single additional dependency, apache-airflow-providers-postgres. This library supplies Airflow operators (templates for Airflow tasks) that work with a PostgreSQL database.

Development environments using the local executor and the celery executor build upon the Docker configuration for the sequential executor. The local executor operates on a single machine, similar to the sequential executor. However, unlike the sequential executor which runs only one task at a time, the local executor runs multiple tasks in parallel7. The celery executor not only runs tasks in parallel, but can also distribute tasks across multiple machines. The local executor environment and the celery executor environment Docker Compose files are available on GitHub.

Let’s shift our attention to building pipelines (DAGs) in Airflow. All the DAGs in my Airflow environment exist in a dags directory. To follow along, you can run a docker-compose up command from the Airflow/sequential-executor directory of my repository and navigate to http://localhost:8080/ in a web browser.

To get started, here is a "Hello World" style DAG that contains two simple tasks.

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False } def task(): print(f"The current time is {datetime.now().strftime('%b. %d, %Y %-I:%M %p UTC')}") with DAG( dag_id="hello_world", description="A hello world DAG which shows the basic execution flow of Airflow", default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None, default_view="graph", tags=["sample", "python", "bash"] ) as dag: bash_task = BashOperator(task_id='bash_task', bash_command='echo "Hello from Airflow!"') python_task = PythonOperator(task_id='python_task', python_callable=task) bash_task >> python_task

Airflow contains a DAG class, which is used to create a DAG. One way to initialize a DAG is the with DAG(...) as dag: syntax, as shown in the code above. Nested inside the with statement, Airflow Tasks are defined which are part of the DAG. The DAG above has two tasks: bash_task and python_task. bash_task runs a Bash script, and python_task runs a Python function. The order in which these tasks are run is defined with the bash_task >> python_task operator syntax. This line says that bash_task runs first, followed by python_task. More details about the order between tasks (task relationships) can be found in the Airflow documentation.

bash_task is created with BashOperator(). BashOperator() takes a bash_command argument containing a command to run in a Bash shell. The command, echo "Hello from Airflow!"' prints "Hello from Airflow!" to the Airflow logs. python_task is created with PythonOperator(). PythonOperator() takes a python_callable argument, which is a Python function. The Python function, task(), prints out the current time to the Airflow logs.

The DAG() initializer takes arguments which configure the DAG. The DAG is given the name hello_world (dag_id="hello_world"), provided a description (description="..."), and given default arguments (default_args=default_args). Default arguments are passed along to all the tasks within a DAG8. Similar to DAGs, tasks take arguments to alter their configuration.

The DAG is also given a timeout of two hours (dagrun_timeout=timedelta(hours=2)). If the DAG runs for more than two hours before completing, the DAG is stopped and marked as a failure. The DAG is not given a schedule interval, meaning that it does not get triggered on any specified time (schedule_interval=None). default_view="graph" means that when clicking on the DAG in the Airflow web server, the graph view is shown by default (as shown in the image above). tags=["sample", "python", "bash"] attaches three tags to the DAG. Tags are searchable in the DAG list page of the web server, allowing you to easily find data pipelines as the number of DAGs grows. For example, the following screenshot shows the result of me filtering by the "python" tag.

Now let’s look at a slightly more complex DAG. A branch DAG is shown below.

from datetime import timedelta, datetime from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.utils.dates import days_ago def branch(): if datetime.now().weekday() >= 5: return 'weekend_task' else: return 'weekday_task' def weekend(): print( "Schedule:\n" "8 AM - 12 PM: Run & Workout\n" "12 PM - 10 PM: Code & Relax" ) def weekday(): print( "Schedule:\n" "6 AM - 9 AM: Run & Workout\n" "9 AM - 5 PM: Work\n" "5 PM - 10 PM: Code & Relax" ) default_args = { 'owner': 'airflow', 'depends_on_past': False } with DAG( dag_id="branch", description="A DAG that branches", default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval="@daily", default_view="graph", is_paused_upon_creation=False, tags=["sample", "branch", "python"] ) as dag: branch_task = BranchPythonOperator(task_id='branch', python_callable=branch) weekend_task = PythonOperator(task_id='weekend_task', python_callable=weekend) weekday_task = PythonOperator(task_id='weekday_task', python_callable=weekday) branch_task >> [weekend_task, weekday_task]

This DAG contains three tasks, all of which run Python functions. The DAG is configured similar to the hello_world DAG, except that there is now a schedule interval. The interval "@daily" says that the DAG is automatically triggered every day. Airflow provides different types of schedule intervals; specifically, there are preset, cron, and frequency intervals. @daily is a preset interval; other examples of preset intervals include @hourly, @weekly, and @monthly. I have example DAGs of preset, cron, and frequency intervals in my GitHub repository.

The other major difference between the hello_world DAG and the branch DAG is that the tasks in branch follow a branching pattern. That is, a task in the DAG has two downstream tasks, creating a branch in the task dependency tree. In the branch DAG, the branch_task task has two downstream tasks: weekend_task and weekday_task. This relationship is defined by the line branch_task >> [weekend_task, weekday_task].

I also introduced a new operator in this DAG: BranchPythonOperator(). BranchPythonOperator() runs a Python function that returns the name of a single task or a list of tasks. The tasks returned by the Python function are triggered as downstream dependencies of the current task. BranchPythonOperator() provides flexibility to DAGs, allowing tasks to be triggered conditionally.

In my branch DAG, BranchPythonOperator() takes a python_callable argument with the value of a branch() function. This function returns the task name weekend_task if the current day is a weekend, otherwise it returns the task name weekday_task. In other words, weekend_task is triggered on weekends, while weekday_task is triggered on weekdays.

Airflow provides too many options for configuring workflows to discuss in a single article, let alone an entire book. The Airflow documentation has a lot of useful information for working with Airflow. If you are looking to truly master Airflow, I recommend reading the book Data Pipelines with Apache Airflow.

Airflow is a useful platform for building data pipelines. It’s been production-tested, handling data workloads at large companies in the software engineering industry. All the code shown in this article is available on GitHub.