From Raw to Refined: Building ETL Pipelines with Airflow
Building resilient ETL (Extract, Transform, Load) pipelines is the cornerstone of modern data engineering. As data volumes grow and use cases become more sophisticated, orchestrating daily, hourly, or even real-time data processes can get complex. Apache Airflow, a platform created by the community to programmatically author, schedule, and monitor workflows, has quickly become a go-to solution for managing complex data pipelines. In this blog post, we’ll walk you through the fundamentals of ETL pipelines, introduce Airflow’s features, and dive into a step-by-step approach to building everything from basic to advanced pipelines. Whether you are just starting out or looking to expand your professional-level expertise, this guide will help you build robust ETL workflows using Airflow.
Table of Contents
- What is ETL, and Why Does It Matter?
- Introduction to Airflow
- Setting Up Airflow Locally
- Core Airflow Concepts: DAGs, Tasks, and Operators
- Your First Airflow DAG
- Data Extraction in Airflow
- Data Transformation in Airflow
- Loading Data into Target Systems
- Advanced Airflow Features
- Monitoring, Logging, and Alerting
- Performance and Scalability Best Practices
- Real-World Use Cases and Project Ideas
- Conclusion
What is ETL, and Why Does It Matter?
Before diving into Airflow, let’s clarify the concept of ETL:
- Extract (E): This step involves fetching data from one or multiple sources (databases, APIs, CSV files, etc.).
- Transform (T): In this phase, we convert, clean, and shape the data. This might include operations such as deduplication, data standardization, validations, or enrichment from additional sources.
- Load (L): Finally, we move the transformed data to a destination (data warehouse, analytics system, or database) so that it can be used for reporting or analysis.
ETL is essential because it guarantees that raw data is standardized, clean, and ready for analytical or operational use. Without a solid ETL strategy, organizations quickly end up with siloed, messy data that hinders decision-making.
Introduction to Airflow
Apache Airflow started as a solution within Airbnb to manage complex workflows. It is now an open-source project governed by the Apache Software Foundation. Airflow enables you to:
- Programmatically author workflows: You write Python code to define your data pipelines.
- Schedule workflows: You can schedule tasks to run periodically or on specific time intervals.
- Monitor workflows: Airflow provides a handy web-based interface to monitor workflow states, logs, and manage tasks’ execution.
Its core principles include:
- DAG-based scheduling: Airflow organizes tasks into Directed Acyclic Graphs (DAGs). Each DAG represents a complete workflow and defines the execution order of tasks.
- Dependency management: You can specify the exact order in which tasks should be executed and set dependencies for more complex scenarios.
- Scalability and extensibility: Airflow supports distributed execution via Celery or Kubernetes, and you can write custom operators and hooks to interface with nearly any system.
Setting Up Airflow Locally
Prerequisites
- Python 3.7 or higher.
- A virtual environment manager like
venv
orconda
(recommended). - pip (to install Airflow and its dependencies).
Though Airflow can be deployed in various ways (locally, Docker, Kubernetes), starting locally is often the simplest for beginners.
Installation Steps
-
Create and activate a virtual environment.
Example withvenv
:Terminal window python -m venv airflow-envsource airflow-env/bin/activate -
Install Airflow via pip.
Terminal window pip install "apache-airflow[postgres,google]>=2.5,<3"The
[postgres,google]
part indicates additional dependencies for using Postgres and Google integrations, but you can tailor it to your environment. -
Initialize Airflow’s metadata database.
Terminal window airflow db init -
Create an admin user (for the web interface).
Terminal window airflow users create \--username admin \--firstname Airflow \--lastname Admin \--role Admin \--email admin@example.com -
Start the Airflow scheduler and webserver.
Terminal window airflow scheduler &airflow webserverBy default, the Airflow UI will be accessible at http://localhost:8080.
Post-Installation Configuration
- airflow.cfg file: You can change default settings like
executor
,sql_alchemy_conn
, orweb_server_port
. - Environment variables: Airflow can read environment variables for settings, which helps when deploying to different environments (e.g., dev, staging, prod).
Core Airflow Concepts: DAGs, Tasks, and Operators
Directed Acyclic Graph (DAG)
A DAG is a collection of tasks with dependencies that define how they’re organized and in what order they should run. Each node (task) in the DAG represents work to be done, and edges represent the required sequence.
Tasks
Within a DAG, a task is a parameterized instance of one of Airflow’s classes that tells Airflow what to do (run a Python function, execute a Bash command, move files, etc.).
Operators
Operators are templates for tasks. Airflow comes with many operators out of the box:
- BashOperator: Executes a bash command.
- PythonOperator: Runs a Python function.
- PostgresOperator: Executes SQL statements in a Postgres database.
- S3ToRedshiftOperator: Moves data from S3 to Redshift.
- And many others for AWS, GCP, Spark, etc.
In addition, there are advanced concepts like Hooks (wrappers around external APIs or databases) and Sensors (tasks that wait for a certain condition to be met).
Your First Airflow DAG
Below is a simple DAG that has two tasks: one prints a welcome message, and the other prints the current date. It illustrates basic concepts like scheduling and task dependencies.
import datetimefrom airflow import DAGfrom airflow.operators.bash import BashOperator
default_args = { 'owner': 'data_engineer', 'start_date': datetime.datetime(2023, 9, 1), 'retries': 1, 'retry_delay': datetime.timedelta(minutes=2)}
with DAG( dag_id='simple_dag', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:
task_hello = BashOperator( task_id='hello', bash_command='echo "Hello, Airflow!"' )
task_date = BashOperator( task_id='show_date', bash_command='date' )
task_hello >> task_date
Explanation
- default_args: Arguments passed to each task (owner, start date, retries, etc.).
- dag_id: Unique identifier for the DAG.
- schedule_interval: Defines how often the DAG runs (
@daily
,@hourly
, a cron expression, etc.). - task_hello >> task_date: The “>>” sets the execution order, ensuring
task_hello
completes beforetask_date
.
Place this Python file in the dags
folder (usually located in ~/airflow/dags
by default), and Airflow will automatically detect it. You’ll see it in the Airflow web UI and can trigger or schedule it as needed.
Data Extraction in Airflow
Common Data Sources
- Relational Databases (Postgres, MySQL, Oracle)
- Cloud Storage (AWS S3, GCS)
- SaaS APIs (Salesforce, Stripe)
- Flat Files (CSV, Parquet)
Airflow offers a variety of operators and hooks to connect to these services. Below is a short table that summarizes Airflow operators/hook pairs for some typical sources:
Data Source | Hook/Operator | Description |
---|---|---|
Postgres | PostgresHook, PostgresOperator | Execute SQL statements, fetch data from tables |
MySQL | MySqlHook, MySqlOperator | Similar usage to Postgres but for MySQL |
S3 | S3Hook | List, get, put files in AWS S3 buckets |
GCS | GCSHook | Convenient methods for interacting with GCS |
APIs | HttpHook | Make custom REST calls |
Example: Extract Data from a Database
Say you have a Postgres database storing user data. You want to export a table to a CSV file in an S3 bucket daily. Below is a simplified outline of how you’d do it with Airflow:
import datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom airflow.providers.postgres.hooks.postgres import PostgresHookfrom airflow.providers.amazon.aws.hooks.s3 import S3Hook
def extract_data_from_postgres(): pg_hook = PostgresHook(postgres_conn_id='my_postgres') records = pg_hook.get_pandas_df("SELECT * FROM users") records.to_csv('/tmp/users_data.csv', index=False)
def upload_to_s3(): s3_hook = S3Hook(aws_conn_id='my_s3') s3_hook.load_file('/tmp/users_data.csv', key='users_data.csv', bucket_name='my-bucket', replace=True)
default_args = { 'owner': 'data_engineer', 'start_date': datetime.datetime(2023, 9, 1),}
with DAG( dag_id='extract_postgres_to_s3', default_args=default_args, schedule_interval='@daily', catchup=False,) as dag:
extract_task = PythonOperator( task_id='extract_postgres', python_callable=extract_data_from_postgres )
upload_task = PythonOperator( task_id='upload_to_s3', python_callable=upload_to_s3 )
extract_task >> upload_task
Here, we have two Python functions: one to fetch data into a CSV, and another to upload that CSV to S3. Airflow’s PostgresHook
sums up hours of boilerplate you’d otherwise need for connecting to databases.
Data Transformation in Airflow
Depending on your use cases, transformations can happen in multiple ways:
- Operator-based transformations: Using built-in or custom Python logic in a PythonOperator.
- External engines: Offloading to Spark, Hadoop, or a cloud service (like AWS EMR or GCP Dataproc).
- SQL-based transformations: Running SQL on a data warehouse (e.g., BigQuery, Snowflake) via an operator.
Example Transformation
Let’s illustrate a simple Python transformation for normalizing some data columns:
import datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperator
def transform_data(): import pandas as pd
df = pd.read_csv('/tmp/users_data.csv') # Normalize columns df['email'] = df['email'].str.lower() # Convert sign-up date from string to datetime df['sign_up_date'] = pd.to_datetime(df['sign_up_date'], format='%Y-%m-%d') # Save transformed data df.to_csv('/tmp/users_data_transformed.csv', index=False)
default_args = { 'owner': 'data_engineer', 'start_date': datetime.datetime(2023, 9, 1),}
with DAG( dag_id='transform_data_dag', default_args=default_args, schedule_interval=None, catchup=False,) as dag:
transform_task = PythonOperator( task_id='transform', python_callable=transform_data )
- This DAG has a single task that reads a CSV, applies transformations, and writes the result to a new CSV.
- In more advanced contexts, you could read from a database or an object store, process large batches with a distributed framework, or chain multiple transformations together in a single DAG.
Loading Data into Target Systems
Common Targets for Load
- Data Warehouse: Redshift, BigQuery, Snowflake.
- Analytical DB: Postgres, MySQL.
- NoSQL Stores: MongoDB, Cassandra.
- Data Lakes: S3 (Parquet or ORC files), HDFS.
Loading data often entails using specialized operators or writing custom scripts to push data to the destination. Below is an example using the BigQuery operator to load data from GCS:
import datetimefrom airflow import DAGfrom airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
default_args = { 'owner': 'data_engineer', 'start_date': datetime.datetime(2023, 9, 1),}
with DAG( dag_id='load_gcs_to_bigquery', default_args=default_args, schedule_interval='@daily', catchup=False,) as dag:
create_external_table = BigQueryCreateExternalTableOperator( task_id='create_external_table', bucket='my-data', source_objects=['users_data_transformed.csv'], destination_project_dataset_table='my_project.my_dataset.users_data', source_format='CSV', skip_leading_rows=1, field_delimiter=',', schema_fields=[ {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'}, {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'sign_up_date', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'} ], google_cloud_storage_conn_id='my_gcp_conn', bigquery_conn_id='my_bigquery_conn' )
This operator creates an external table in BigQuery reading from your file in GCS. The same concept applies to other warehouses like Snowflake or Redshift, where corresponding operators exist (e.g., SnowflakeOperator
, RedshiftSQLOperator
).
Advanced Airflow Features
XComs (Cross-Communication)
XCom is Airflow’s built-in mechanism for sharing data between tasks. Rather than persisting data in files, you can push and pull small amounts of data (like IDs, metadata, or simple objects) across tasks.
def push_data(**context): context['ti'].xcom_push(key='my_data', value='hello_world')
def pull_data(**context): result = context['ti'].xcom_pull(key='my_data', task_ids='push_task') print("Received data:", result)
Branching
Branching allows you to conditionally choose a path in your DAG. For instance, you can decide which tasks to run based on whether your data meets some criteria.
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context): # Pseudo condition if context['execution_date'].weekday() < 5: return 'week_day_task' else: return 'weekend_task'
branch_op = BranchPythonOperator( task_id='branch_op', python_callable=choose_branch)
SubDAGs
SubDAGs are a way to nest a DAG inside another DAG for modularity. However, SubDAGs are largely replaced by Task Groups in modern Airflow versions. Task Groups provide a simpler mechanism for grouping tasks visually and logically without introducing a brand new DAG.
Task Groups
from airflow.utils.task_group import TaskGroupwith DAG(...) as dag: with TaskGroup("extraction_group") as extraction_group: # define tasks for extraction ... with TaskGroup("transformation_group") as transformation_group: # define tasks for transformation ...
Sensors
Sensors are simply specialized operators that wait for a certain event. For example, an S3KeySensor
waits for a file to land in an S3 bucket before allowing downstream tasks to proceed.
Monitoring, Logging, and Alerting
Airflow UI
- DAGs View: A list of all the DAGs in your environment.
- Graph View: A graphical representation of tasks and their dependencies.
- Tree View: Historical runs of your DAG over time.
- Task Instance Logs: Logs from each task run.
Email and Slack Notifications
You can configure email or Slack alerts for task failures or retries. Update your airflow.cfg
or use environment variables to set email or Slack hooks. For Slack, you’d typically use the SlackWebhookOperator
or custom on-failure callbacks:
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def slack_fail_alert(context): slack_msg = f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}" alert = SlackWebhookOperator( task_id='slack_fail', http_conn_id='slack_conn', webhook_token='YOUR_WEBHOOK_TOKEN', message=slack_msg, username='airflow' ) return alert.execute(context=context)
And then in your task definition:
PythonOperator( task_id='some_task', python_callable=my_callable, on_failure_callback=slack_fail_alert)
Performance and Scalability Best Practices
Executors
Airflow comes with different executors to handle how tasks are run:
- SequentialExecutor: Runs one task at a time (default for local testing).
- LocalExecutor: Runs parallel tasks on a single machine.
- CeleryExecutor: Distributes tasks across multiple worker nodes.
- KubernetesExecutor: Schedules each task in a separate Kubernetes pod.
Database Optimizations
- Ensure you use a production-grade database for the Airflow metadata (e.g., Postgres or MySQL).
- Avoid SQLite in production scenarios as it can cause concurrency issues.
- Clean up old logs and XCom entries regularly to prevent the metadata DB from bloating.
Scaling with Celery or Kubernetes
- CeleryExecutor: Requires a message broker like RabbitMQ or Redis. You set up multiple worker nodes to process tasks in parallel.
- KubernetesExecutor: Each Airflow task runs in its own Kubernetes pod. This is highly dynamic and can scale well if you already have a Kubernetes cluster.
Parallelism and Concurrency
- parallelism: The max number of task instances that can run across all DAGs.
- dag_concurrency: Max number of task instances for a single DAG at once.
- max_active_runs_per_dag: Limits how many DAG runs can be active simultaneously.
Tuning these parameters in airflow.cfg
helps manage resource usage.
Real-World Use Cases and Project Ideas
- Daily Data Warehouse Load: Pull data from an operational Postgres DB, transform it in Python, load it into Redshift or BigQuery.
- Event-Driven Pipelines: Trigger Airflow workflows when a file arrives in S3 or GCS.
- Machine Learning Workflows: Chain data extraction, feature engineering, model training, and model deployment steps.
- Pipeline for IoT Data: Ingest sensor data from MQTT or Kafka, perform real-time transformations, and push to a data lake or time-series DB.
- Reporting and Dashboard Refresh: Automate the generation of summary tables and refresh BI dashboards in Looker, Tableau, or Power BI.
Conclusion
Apache Airflow is a powerful orchestrator that bridges the gap between raw data and refined insights. By defining your ETL (or ELT) workflows as code, you gain clarity, version control, and the ability to scale easily over time. You’ve seen how to set up Airflow locally, build basic DAGs, connect to various data sources, implement transformations, and load the results into different targets. We also covered advanced features, best practices for performance, and real-world scenarios.
As you progress:
- Explore more operators and hooks to reduce custom boilerplate.
- Integrate Airflow with container orchestration platforms like Kubernetes for scalable workloads.
- Investigate advanced concepts like dynamic DAG generation or distributed runtime environments.
Modern data engineering demands reliable pipelines. With Airflow at the heart of your stack, you have a battle-tested, production-grade tool to unify complex processes. As you continue building and refining your ETL workflows, Airflow will be a central pillar that helps turn raw data into insightful, actionable information. Happy orchestrating!