Mastering Airflow: A Step-by-Step Guide to Automated ETL
Airflow has become the de facto standard for orchestrating and automating data pipelines in modern data engineering. Built by Airbnb in 2014, Airflow empowers data engineers with a platform to programmatically author, schedule, and monitor workflows. If you need a resilient and scalable method to run Extract, Transform, and Load (ETL) jobs, Airflow is a formidable choice.
In this guide, we’ll walk through the fundamental concepts of Airflow—covering installation, basic DAG creation, and usage of operators—before diving into more advanced topics such as sensors, subDAGs, best practices, and production-level deployments. Hop along to learn how to build robust data pipelines with Airflow, step by step.
Table of Contents
- What Is Airflow?
- Key Concepts and Terminology
- Getting Started
- Building Your First Airflow DAG
- Working with Operators
- Advanced Airflow Concepts
- Managing Dependencies and Scheduling
- Monitoring and Debugging
- Scaling Airflow
- Performance Optimization and Best Practices
- Production-Ready Deployment
- Conclusion
- Further Reading
What Is Airflow?
Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Originally developed by Airbnb, it aims to provide:
- A clean, configurable, and flexible way to define tasks and their dependencies.
- A scheduler that executes tasks on an array of workers while respecting defined dependencies.
- A user-friendly web interface for managing, monitoring, and troubleshooting tasks.
Airflow follows a configuration-as-code approach, meaning you define your data pipelines (commonly referred to as “workflows” or “ETL pipelines”) in Python scripts. This approach allows you to write complex business logic without the constraints of a purely GUI-based system.
Key Concepts and Terminology
Directed Acyclic Graphs (DAGs)
In Airflow, a Directed Acyclic Graph (DAG) is a collection of tasks organized to reflect their relationships and dependencies. The “acyclic” part means the graph cannot have loops—no task can depend on itself, directly or indirectly.
- Nodes in the DAG represent tasks, and
- Edges indicate the sequence in which those tasks must be executed.
Tasks and Operators
A task is a self-contained unit of work, and an operator is a template defining what that task does. Operators come in various types:
- BashOperator: Executes a bash command.
- PythonOperator: Handles Python callable functions.
- EmailOperator: Sends email notifications.
- … (there are many more depending on your use cases and providers)
You can see operators as building blocks for your tasks. Each operator can be parameterized and orchestrated to fit your data pipeline.
Scheduling and Execution
Airflow runs your DAGs on a scheduled basis (unless you trigger them manually). The scheduler looks at the start date, interval (or cron expression), and any time-based parameters to decide when your DAG should run. If the dependencies are met, tasks are placed in the queue, and an executor manages how tasks run in parallel or sequentially based on resources.
Getting Started
Installation and Setup
It’s easy to get started with Airflow using pip:
pip install apache-airflow
However, you must be aware of some constraints and best practices:
- Using a dedicated Python virtual environment is strongly advised.
- Make sure you have a proper version of Python (3.7+ for recent Airflow releases).
- Be mindful of the Airflow version you install, as different versions have different dependencies.
Once installed, you’ll want to set the AIRFLOW_HOME
environment variable. By default, Airflow uses ~/airflow
as the home directory.
export AIRFLOW_HOME=~/my_airflowairflow db initairflow users create \ --username admin \ --password admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com
Finally, start the scheduler and web server:
airflow webserver -p 8080airflow scheduler
You can now access the Airflow UI at http://localhost:8080.
Basic Configuration Files
Airflow has a few files you should be aware of:
- airflow.cfg: The main Airflow configuration file that controls the executor type, database connections, and more.
- webserver_config.py: Contains the configuration for Airflow’s web interface.
- requirements.txt (or environment management files): For specifying additional Python packages your DAGs depend on.
You typically store your DAGs in the dags
folder within your AIRFLOW_HOME
.
Building Your First Airflow DAG
DAG Definition
At its simplest, a DAG is defined by creating a Python file (e.g., simple_dag.py
) within your dags
folder that contains a DAG
object. The timing configuration is defined in the DAG
initialization, along with the starting date and schedule interval.
Let’s break it down with a quick example.
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python_operator import PythonOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5),}
def say_hello(): print("Hello from Airflow!")
with DAG( dag_id='simple_dag', default_args=default_args, description='A simple tutorial DAG', start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False,) as dag:
task_hello = PythonOperator( task_id='task_hello', python_callable=say_hello )
task_hello
Example: Simple DAG with PythonOperator
In the snippet above:
- We import necessary modules (
DAG
,PythonOperator
). - We define a dictionary of
default_args
to handle aspects like retries and email notifications. - We define a function
say_hello()
that prints a message. - We create a
DAG
context with thewith
statement, specifying parameters likestart_date
andschedule_interval
. - We create a
PythonOperator
that references oursay_hello()
function.
Once the file is placed in your dags
directory, Airflow automatically detects and parses it. You should see simple_dag
in the Airflow UI under the list of DAGs.
Working with Operators
Common Operators
Airflow offers a variety of operators that can help with typical tasks. Here’s a brief table listing some commonly used operators:
Operator | Purpose |
---|---|
BashOperator | Execute bash commands |
PythonOperator | Execute Python callables |
EmailOperator | Send emails |
SQL Operators | Execute SQL commands (e.g., PostgresHook) |
DockerOperator | Run Docker containers |
KubernetesPodOperator | Run tasks in Kubernetes Pods |
Each operator can be fine-tuned based on parameters, like environment variables for BashOperator
, or query strings for SQL-related operators.
Branching
Branching is a powerful feature in Airflow that allows you to control the flow of tasks. You can use an operator like BranchPythonOperator
to decide which path your DAG should take based on some condition:
from airflow.operators.python_operator import BranchPythonOperator
def _branching_logic(): # Return the task_id of the next task to run if some_condition(): return 'task_a' else: return 'task_b'
branching = BranchPythonOperator( task_id='branch_task', python_callable=_branching_logic, dag=dag)
task_a = PythonOperator( task_id='task_a', python_callable=some_callable_a, dag=dag)
task_b = PythonOperator( task_id='task_b', python_callable=some_callable_b, dag=dag)
branching >> [task_a, task_b]
Templating
Airflow’s templating feature uses Jinja to dynamically replace variables in your tasks at runtime. Often helpful for passing run dates ({{ ds }}
or {{ execution_date }}
) into tasks. For example:
templated_command = """ echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}""""
task = BashOperator( task_id='templated_task', bash_command=templated_command, dag=dag)
The above snippet would print the current run date and a date 7 days in the future.
Advanced Airflow Concepts
Sensors
Sensors are a special type of operator that “senses” for some condition to be met before running downstream tasks. Examples include:
- FileSensor: Waits for a file to be present in a filesystem.
- ExternalTaskSensor: Waits for another DAG’s task to complete.
- S3KeySensor: Waits for a file to show up in an S3 bucket.
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor( task_id='wait_for_file', filepath='/tmp/data_ready.txt', poke_interval=30, timeout=600, dag=dag)
The above sensor checks every 30 seconds if /tmp/data_ready.txt
exists. If it isn’t found within 600 seconds, the task fails.
SubDAGs and Modular Pipelines
A SubDAG is a DAG embedded in a parent DAG, meant to provide modularity and reusability. SubDAGs help manage complex workflows by breaking them into smaller, more maintainable pieces. However, subDAGs can sometimes complicate scheduling unless designed carefully.
An alternative approach in modern Airflow is to use TaskGroup, which provides a more streamlined way of grouping tasks under a semantic name without some of the limitations of subDAGs.
Custom Operators and Plugins
If the built-in operators don’t meet your needs, you can create custom operators by extending Airflow’s BaseOperator
. You can also develop plugins to package and distribute your custom functionality:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator): def __init__(self, my_param, *args, **kwargs): super(MyCustomOperator, self).__init__(*args, **kwargs) self.my_param = my_param
def execute(self, context): # Custom logic here self.log.info(f'My param is: {self.my_param}')
Plugins organize custom hooks, operators, sensors, and more into a cohesive package that can be easily shared.
TaskGroup in Airflow 2
Introduced in Airflow 2, TaskGroup
allows you to group tasks logically without the overhead of subDAGs. This is done by creating a grouping context:
from airflow.utils.task_group import TaskGroup
with TaskGroup("data_processing") as data_processing: task_1 = PythonOperator( task_id='task_1', python_callable=lambda: print("Task 1") ) task_2 = PythonOperator( task_id='task_2', python_callable=lambda: print("Task 2") )
task_1 >> task_2
You can then reference data_processing
in the parent DAG to place it in the workflow:
start_task >> data_processing >> end_task
Managing Dependencies and Scheduling
Triggers and External Task Dependencies
Airflow allows DAGs to trigger other DAGs. You can configure triggers using operators like TriggerDagRunOperator
. You might also use ExternalTaskSensor
to wait for a particular task in an external DAG to complete before continuing.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_dag = TriggerDagRunOperator( task_id='trigger_other_dag', trigger_dag_id='other_dag_id', execution_date='{{ ds }}', wait_for_completion=True, poke_interval=30, dag=dag)
Cron Expressions and Timed Schedules
A big part of Airflow is deciding how often your DAG runs. You can use built-in Airflow strings like @daily
, @hourly
, or @weekly
, but you can also provide a cron expression to gain full control:
# Run every day at 2 AMschedule_interval='0 2 * * *'
Monitoring and Debugging
Airflow UI
The Airflow UI provides a graphical representation of your DAGs, their runs, and task status. You can:
- See a DAG’s structure in the Graph view.
- View a run’s status in the Tree view.
- Check logs for each task.
- Trigger manual runs.
- Pause DAGs.
Logging and Alerts
Airflow logs every task’s output by default, storing logs either on the local file system or in services like S3, depending on your configuration. You can also configure email alerts, Slack notifications, or other custom alerting methods on task failure or retries.
default_args = { 'email_on_failure': True, 'email': ['airflow@example.com']}
Scaling Airflow
Executor Choices
Airflow can be scaled vertically (more CPU, more memory on a single server) or horizontally (multiple workers). The choice of executor is crucial:
- SequentialExecutor: Processes tasks sequentially (good for testing or small setups).
- LocalExecutor: Runs tasks in parallel locally, using multiple processes.
- CeleryExecutor: Distributes tasks across multiple worker nodes.
- KubernetesExecutor: Creates dynamic worker pods in Kubernetes for tasks.
Celery Executor
Celery is a distributed task queue that allows you to scale out worker processes horizontally. With the CeleryExecutor, you configure a message broker (often Redis or RabbitMQ) and result backend, enabling you to add or remove worker servers seamlessly.
Here’s a sample excerpt from airflow.cfg
:
[core]executor = CeleryExecutor
[celery]broker_url = redis://localhost:6379/0result_backend = db+postgresql://airflow:airflow@localhost:5432/airflow
Once configured, you can start workers using:
airflow celery worker
Kubernetes Executor
The KubernetesExecutor spins up a new pod for each task, providing perfect isolation and scaling per task. You’ll need a Kubernetes cluster (e.g., self-managed or a cloud service like EKS, GKE, or AKS). In your airflow.cfg
:
[core]executor = KubernetesExecutor
[kubernetes]namespace = airflow
Then, each time a task is scheduled, it launches a new pod in the airflow
namespace, pulling any necessary images.
Performance Optimization and Best Practices
Designing Efficient DAGs
- Keep DAGs small and focused: Each DAG should perform a coherent set of tasks.
- Use cross-DAG dependencies wisely: Review your design to avoid overly complex interlocking DAGs.
- Minimize overhead: Avoid running heavy computations directly in tasks. Instead, delegate such work to external systems or specialized services.
Avoiding Common Pitfalls
- Too many tasks in one DAG: Harder to manage, can slow the scheduler.
- Excessive sensor usage: Sensors that poke frequently can overload the scheduler; consider asynchronous sensors.
- Poorly tuned concurrency settings: If too low, tasks starve. If too high, you risk resource exhaustion.
Testing Strategies
Airflow offers several methods for testing:
- Local debug: Run tasks in isolation with
airflow tasks test <DAG_ID> <TASK_ID> <EXECUTION_DATE>
. - Unit testing: Import your DAGs in a test file and assert certain properties (e.g., number of tasks).
- CI/CD: Airflow structures can be integrated into a CI/CD pipeline, checking DAG syntax automatically.
Production-Ready Deployment
Docker and Docker Compose
An easy path to production-like environments is Docker. By containerizing Airflow components (webserver, scheduler, workers), you ensure parity across environments. A simple docker-compose.yaml
might look like:
version: '3'services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow redis: image: redis:latest airflow-webserver: image: apache/airflow:2.3.0 depends_on: - postgres - redis environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow volumes: - ./dags:/opt/airflow/dags ports: - "8080:8080" command: ["webserver"] airflow-scheduler: image: apache/airflow:2.3.0 depends_on: - airflow-webserver environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow volumes: - ./dags:/opt/airflow/dags command: ["scheduler"] airflow-worker: image: apache/airflow:2.3.0 depends_on: - airflow-webserver environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow volumes: - ./dags:/opt/airflow/dags command: ["celery", "worker"]
Run docker-compose up
and you have a multi-container Airflow environment, complete with a Postgres database and Redis broker.
Airflow in the Cloud
If your team already uses a cloud provider, you can leverage:
- Apache Airflow on AWS Managed Workflows for Apache Airflow (MWAA).
- Google Cloud Composer.
- Astronomer (a managed Airflow platform).
Managed solutions simplify cluster scaling, upgrades, and resource provisioning, letting you focus on DAG creation instead of infrastructure management.
Conclusion
Airflow provides a flexible, maintainable, and scalable way to orchestrate ETL pipelines. By defining DAGs in Python, you gain the power to build complex data workflows driven by scheduling, sensors, and branching logic. You can easily integrate external systems via operators and sensors, or stretch its functionality with custom operators and plugins.
As you progress:
- Start small with local, simple DAGs.
- Explore advanced scheduling, branching, and templating.
- Scale out using CeleryExecutor or KubernetesExecutor.
- Adopt best practices to avoid common pitfalls like sensor overload or extremely large DAGs.
- Consider Docker or managed cloud solutions to reduce operational overhead.
With a bit of experimentation and a vision for your data pipelines, Airflow can become the backbone of your organization’s automated ETL, ensuring data reliability, consistency, and traceability.
Further Reading
- Airflow official documentation: https://airflow.apache.org/docs/apache-airflow/stable/
- Airflow GitHub Repository: https://github.com/apache/airflow
- Astronomer Blog for Airflow tips and tutorials: https://www.astronomer.io/blog
- Cron Expressions reference: https://crontab.guru
Feel free to dive deeper into advanced features such as SLA monitoring, task lineage, and performance profiling. Mastering Airflow is a journey that leads to building highly reliable and fault-tolerant data pipelines—enjoy the ride!