2161 words
11 minutes
From Raw to Refined: Building ETL Pipelines with Airflow

From Raw to Refined: Building ETL Pipelines with Airflow#

Building resilient ETL (Extract, Transform, Load) pipelines is the cornerstone of modern data engineering. As data volumes grow and use cases become more sophisticated, orchestrating daily, hourly, or even real-time data processes can get complex. Apache Airflow, a platform created by the community to programmatically author, schedule, and monitor workflows, has quickly become a go-to solution for managing complex data pipelines. In this blog post, we’ll walk you through the fundamentals of ETL pipelines, introduce Airflow’s features, and dive into a step-by-step approach to building everything from basic to advanced pipelines. Whether you are just starting out or looking to expand your professional-level expertise, this guide will help you build robust ETL workflows using Airflow.


Table of Contents#

  1. What is ETL, and Why Does It Matter?
  2. Introduction to Airflow
  3. Setting Up Airflow Locally
  4. Core Airflow Concepts: DAGs, Tasks, and Operators
  5. Your First Airflow DAG
  6. Data Extraction in Airflow
  7. Data Transformation in Airflow
  8. Loading Data into Target Systems
  9. Advanced Airflow Features
  10. Monitoring, Logging, and Alerting
  11. Performance and Scalability Best Practices
  12. Real-World Use Cases and Project Ideas
  13. Conclusion

What is ETL, and Why Does It Matter?#

Before diving into Airflow, let’s clarify the concept of ETL:

  • Extract (E): This step involves fetching data from one or multiple sources (databases, APIs, CSV files, etc.).
  • Transform (T): In this phase, we convert, clean, and shape the data. This might include operations such as deduplication, data standardization, validations, or enrichment from additional sources.
  • Load (L): Finally, we move the transformed data to a destination (data warehouse, analytics system, or database) so that it can be used for reporting or analysis.

ETL is essential because it guarantees that raw data is standardized, clean, and ready for analytical or operational use. Without a solid ETL strategy, organizations quickly end up with siloed, messy data that hinders decision-making.


Introduction to Airflow#

Apache Airflow started as a solution within Airbnb to manage complex workflows. It is now an open-source project governed by the Apache Software Foundation. Airflow enables you to:

  • Programmatically author workflows: You write Python code to define your data pipelines.
  • Schedule workflows: You can schedule tasks to run periodically or on specific time intervals.
  • Monitor workflows: Airflow provides a handy web-based interface to monitor workflow states, logs, and manage tasks’ execution.

Its core principles include:

  • DAG-based scheduling: Airflow organizes tasks into Directed Acyclic Graphs (DAGs). Each DAG represents a complete workflow and defines the execution order of tasks.
  • Dependency management: You can specify the exact order in which tasks should be executed and set dependencies for more complex scenarios.
  • Scalability and extensibility: Airflow supports distributed execution via Celery or Kubernetes, and you can write custom operators and hooks to interface with nearly any system.

Setting Up Airflow Locally#

Prerequisites#

  1. Python 3.7 or higher.
  2. A virtual environment manager like venv or conda (recommended).
  3. pip (to install Airflow and its dependencies).

Though Airflow can be deployed in various ways (locally, Docker, Kubernetes), starting locally is often the simplest for beginners.

Installation Steps#

  1. Create and activate a virtual environment.
    Example with venv:

    Terminal window
    python -m venv airflow-env
    source airflow-env/bin/activate
  2. Install Airflow via pip.

    Terminal window
    pip install "apache-airflow[postgres,google]>=2.5,<3"

    The [postgres,google] part indicates additional dependencies for using Postgres and Google integrations, but you can tailor it to your environment.

  3. Initialize Airflow’s metadata database.

    Terminal window
    airflow db init
  4. Create an admin user (for the web interface).

    Terminal window
    airflow users create \
    --username admin \
    --firstname Airflow \
    --lastname Admin \
    --role Admin \
    --email admin@example.com
  5. Start the Airflow scheduler and webserver.

    Terminal window
    airflow scheduler &
    airflow webserver

    By default, the Airflow UI will be accessible at http://localhost:8080.

Post-Installation Configuration#

  • airflow.cfg file: You can change default settings like executor, sql_alchemy_conn, or web_server_port.
  • Environment variables: Airflow can read environment variables for settings, which helps when deploying to different environments (e.g., dev, staging, prod).

Core Airflow Concepts: DAGs, Tasks, and Operators#

Directed Acyclic Graph (DAG)#

A DAG is a collection of tasks with dependencies that define how they’re organized and in what order they should run. Each node (task) in the DAG represents work to be done, and edges represent the required sequence.

Tasks#

Within a DAG, a task is a parameterized instance of one of Airflow’s classes that tells Airflow what to do (run a Python function, execute a Bash command, move files, etc.).

Operators#

Operators are templates for tasks. Airflow comes with many operators out of the box:

  • BashOperator: Executes a bash command.
  • PythonOperator: Runs a Python function.
  • PostgresOperator: Executes SQL statements in a Postgres database.
  • S3ToRedshiftOperator: Moves data from S3 to Redshift.
  • And many others for AWS, GCP, Spark, etc.

In addition, there are advanced concepts like Hooks (wrappers around external APIs or databases) and Sensors (tasks that wait for a certain condition to be met).


Your First Airflow DAG#

Below is a simple DAG that has two tasks: one prints a welcome message, and the other prints the current date. It illustrates basic concepts like scheduling and task dependencies.

simple_dag.py
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data_engineer',
'start_date': datetime.datetime(2023, 9, 1),
'retries': 1,
'retry_delay': datetime.timedelta(minutes=2)
}
with DAG(
dag_id='simple_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False
) as dag:
task_hello = BashOperator(
task_id='hello',
bash_command='echo "Hello, Airflow!"'
)
task_date = BashOperator(
task_id='show_date',
bash_command='date'
)
task_hello >> task_date

Explanation#

  • default_args: Arguments passed to each task (owner, start date, retries, etc.).
  • dag_id: Unique identifier for the DAG.
  • schedule_interval: Defines how often the DAG runs (@daily, @hourly, a cron expression, etc.).
  • task_hello >> task_date: The “>>” sets the execution order, ensuring task_hello completes before task_date.

Place this Python file in the dags folder (usually located in ~/airflow/dags by default), and Airflow will automatically detect it. You’ll see it in the Airflow web UI and can trigger or schedule it as needed.


Data Extraction in Airflow#

Common Data Sources#

  • Relational Databases (Postgres, MySQL, Oracle)
  • Cloud Storage (AWS S3, GCS)
  • SaaS APIs (Salesforce, Stripe)
  • Flat Files (CSV, Parquet)

Airflow offers a variety of operators and hooks to connect to these services. Below is a short table that summarizes Airflow operators/hook pairs for some typical sources:

Data SourceHook/OperatorDescription
PostgresPostgresHook, PostgresOperatorExecute SQL statements, fetch data from tables
MySQLMySqlHook, MySqlOperatorSimilar usage to Postgres but for MySQL
S3S3HookList, get, put files in AWS S3 buckets
GCSGCSHookConvenient methods for interacting with GCS
APIsHttpHookMake custom REST calls

Example: Extract Data from a Database#

Say you have a Postgres database storing user data. You want to export a table to a CSV file in an S3 bucket daily. Below is a simplified outline of how you’d do it with Airflow:

extract_postgres_to_s3.py
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def extract_data_from_postgres():
pg_hook = PostgresHook(postgres_conn_id='my_postgres')
records = pg_hook.get_pandas_df("SELECT * FROM users")
records.to_csv('/tmp/users_data.csv', index=False)
def upload_to_s3():
s3_hook = S3Hook(aws_conn_id='my_s3')
s3_hook.load_file('/tmp/users_data.csv', key='users_data.csv', bucket_name='my-bucket', replace=True)
default_args = {
'owner': 'data_engineer',
'start_date': datetime.datetime(2023, 9, 1),
}
with DAG(
dag_id='extract_postgres_to_s3',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
) as dag:
extract_task = PythonOperator(
task_id='extract_postgres',
python_callable=extract_data_from_postgres
)
upload_task = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3
)
extract_task >> upload_task

Here, we have two Python functions: one to fetch data into a CSV, and another to upload that CSV to S3. Airflow’s PostgresHook sums up hours of boilerplate you’d otherwise need for connecting to databases.


Data Transformation in Airflow#

Depending on your use cases, transformations can happen in multiple ways:

  1. Operator-based transformations: Using built-in or custom Python logic in a PythonOperator.
  2. External engines: Offloading to Spark, Hadoop, or a cloud service (like AWS EMR or GCP Dataproc).
  3. SQL-based transformations: Running SQL on a data warehouse (e.g., BigQuery, Snowflake) via an operator.

Example Transformation#

Let’s illustrate a simple Python transformation for normalizing some data columns:

transform_data.py
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def transform_data():
import pandas as pd
df = pd.read_csv('/tmp/users_data.csv')
# Normalize columns
df['email'] = df['email'].str.lower()
# Convert sign-up date from string to datetime
df['sign_up_date'] = pd.to_datetime(df['sign_up_date'], format='%Y-%m-%d')
# Save transformed data
df.to_csv('/tmp/users_data_transformed.csv', index=False)
default_args = {
'owner': 'data_engineer',
'start_date': datetime.datetime(2023, 9, 1),
}
with DAG(
dag_id='transform_data_dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data
)
  • This DAG has a single task that reads a CSV, applies transformations, and writes the result to a new CSV.
  • In more advanced contexts, you could read from a database or an object store, process large batches with a distributed framework, or chain multiple transformations together in a single DAG.

Loading Data into Target Systems#

Common Targets for Load#

  • Data Warehouse: Redshift, BigQuery, Snowflake.
  • Analytical DB: Postgres, MySQL.
  • NoSQL Stores: MongoDB, Cassandra.
  • Data Lakes: S3 (Parquet or ORC files), HDFS.

Loading data often entails using specialized operators or writing custom scripts to push data to the destination. Below is an example using the BigQuery operator to load data from GCS:

load_to_bigquery.py
import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
default_args = {
'owner': 'data_engineer',
'start_date': datetime.datetime(2023, 9, 1),
}
with DAG(
dag_id='load_gcs_to_bigquery',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
) as dag:
create_external_table = BigQueryCreateExternalTableOperator(
task_id='create_external_table',
bucket='my-data',
source_objects=['users_data_transformed.csv'],
destination_project_dataset_table='my_project.my_dataset.users_data',
source_format='CSV',
skip_leading_rows=1,
field_delimiter=',',
schema_fields=[
{'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'sign_up_date', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}
],
google_cloud_storage_conn_id='my_gcp_conn',
bigquery_conn_id='my_bigquery_conn'
)

This operator creates an external table in BigQuery reading from your file in GCS. The same concept applies to other warehouses like Snowflake or Redshift, where corresponding operators exist (e.g., SnowflakeOperator, RedshiftSQLOperator).


Advanced Airflow Features#

XComs (Cross-Communication)#

XCom is Airflow’s built-in mechanism for sharing data between tasks. Rather than persisting data in files, you can push and pull small amounts of data (like IDs, metadata, or simple objects) across tasks.

def push_data(**context):
context['ti'].xcom_push(key='my_data', value='hello_world')
def pull_data(**context):
result = context['ti'].xcom_pull(key='my_data', task_ids='push_task')
print("Received data:", result)

Branching#

Branching allows you to conditionally choose a path in your DAG. For instance, you can decide which tasks to run based on whether your data meets some criteria.

from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
# Pseudo condition
if context['execution_date'].weekday() < 5:
return 'week_day_task'
else:
return 'weekend_task'
branch_op = BranchPythonOperator(
task_id='branch_op',
python_callable=choose_branch
)

SubDAGs#

SubDAGs are a way to nest a DAG inside another DAG for modularity. However, SubDAGs are largely replaced by Task Groups in modern Airflow versions. Task Groups provide a simpler mechanism for grouping tasks visually and logically without introducing a brand new DAG.

Task Groups#

from airflow.utils.task_group import TaskGroup
with DAG(...) as dag:
with TaskGroup("extraction_group") as extraction_group:
# define tasks for extraction
...
with TaskGroup("transformation_group") as transformation_group:
# define tasks for transformation
...

Sensors#

Sensors are simply specialized operators that wait for a certain event. For example, an S3KeySensor waits for a file to land in an S3 bucket before allowing downstream tasks to proceed.


Monitoring, Logging, and Alerting#

Airflow UI#

  • DAGs View: A list of all the DAGs in your environment.
  • Graph View: A graphical representation of tasks and their dependencies.
  • Tree View: Historical runs of your DAG over time.
  • Task Instance Logs: Logs from each task run.

Email and Slack Notifications#

You can configure email or Slack alerts for task failures or retries. Update your airflow.cfg or use environment variables to set email or Slack hooks. For Slack, you’d typically use the SlackWebhookOperator or custom on-failure callbacks:

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def slack_fail_alert(context):
slack_msg = f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
alert = SlackWebhookOperator(
task_id='slack_fail',
http_conn_id='slack_conn',
webhook_token='YOUR_WEBHOOK_TOKEN',
message=slack_msg,
username='airflow'
)
return alert.execute(context=context)

And then in your task definition:

PythonOperator(
task_id='some_task',
python_callable=my_callable,
on_failure_callback=slack_fail_alert
)

Performance and Scalability Best Practices#

Executors#

Airflow comes with different executors to handle how tasks are run:

  • SequentialExecutor: Runs one task at a time (default for local testing).
  • LocalExecutor: Runs parallel tasks on a single machine.
  • CeleryExecutor: Distributes tasks across multiple worker nodes.
  • KubernetesExecutor: Schedules each task in a separate Kubernetes pod.

Database Optimizations#

  • Ensure you use a production-grade database for the Airflow metadata (e.g., Postgres or MySQL).
  • Avoid SQLite in production scenarios as it can cause concurrency issues.
  • Clean up old logs and XCom entries regularly to prevent the metadata DB from bloating.

Scaling with Celery or Kubernetes#

  • CeleryExecutor: Requires a message broker like RabbitMQ or Redis. You set up multiple worker nodes to process tasks in parallel.
  • KubernetesExecutor: Each Airflow task runs in its own Kubernetes pod. This is highly dynamic and can scale well if you already have a Kubernetes cluster.

Parallelism and Concurrency#

  • parallelism: The max number of task instances that can run across all DAGs.
  • dag_concurrency: Max number of task instances for a single DAG at once.
  • max_active_runs_per_dag: Limits how many DAG runs can be active simultaneously.

Tuning these parameters in airflow.cfg helps manage resource usage.


Real-World Use Cases and Project Ideas#

  1. Daily Data Warehouse Load: Pull data from an operational Postgres DB, transform it in Python, load it into Redshift or BigQuery.
  2. Event-Driven Pipelines: Trigger Airflow workflows when a file arrives in S3 or GCS.
  3. Machine Learning Workflows: Chain data extraction, feature engineering, model training, and model deployment steps.
  4. Pipeline for IoT Data: Ingest sensor data from MQTT or Kafka, perform real-time transformations, and push to a data lake or time-series DB.
  5. Reporting and Dashboard Refresh: Automate the generation of summary tables and refresh BI dashboards in Looker, Tableau, or Power BI.

Conclusion#

Apache Airflow is a powerful orchestrator that bridges the gap between raw data and refined insights. By defining your ETL (or ELT) workflows as code, you gain clarity, version control, and the ability to scale easily over time. You’ve seen how to set up Airflow locally, build basic DAGs, connect to various data sources, implement transformations, and load the results into different targets. We also covered advanced features, best practices for performance, and real-world scenarios.

As you progress:

  • Explore more operators and hooks to reduce custom boilerplate.
  • Integrate Airflow with container orchestration platforms like Kubernetes for scalable workloads.
  • Investigate advanced concepts like dynamic DAG generation or distributed runtime environments.

Modern data engineering demands reliable pipelines. With Airflow at the heart of your stack, you have a battle-tested, production-grade tool to unify complex processes. As you continue building and refining your ETL workflows, Airflow will be a central pillar that helps turn raw data into insightful, actionable information. Happy orchestrating!

From Raw to Refined: Building ETL Pipelines with Airflow
https://science-ai-hub.vercel.app/posts/d2e08c02-0476-4b46-94c3-c9f21e7bdacc/1/
Author
AICore
Published at
2025-05-14
License
CC BY-NC-SA 4.0