1805 words
9 minutes
Cutting-Edge Data Engineering with Airflow ETL Pipelines

Cutting-Edge Data Engineering with Airflow ETL Pipelines#

Welcome to your comprehensive guide on building ETL pipelines using Apache Airflow. In this post, we will start from the fundamentals, progress to advanced concepts, and conclude with professional-level implementations. By the end, you’ll have a clear roadmap for installing, configuring, and scaling Airflow to orchestrate reliable and maintainable ETL (Extract, Transform, Load) processes.


Table of Contents#

  1. Introduction to Data Engineering and ETL
  2. Why Airflow?
  3. Setting Up Airflow
  4. Key Airflow Components
  5. Building a Simple ETL Pipeline
  6. Scheduling and Execution
  7. Advanced Airflow Concepts
  8. Scaling Airflow for Production
  9. Best Practices and Patterns
  10. Real-World Use Cases
  11. Conclusion

Introduction to Data Engineering and ETL#

Data engineering is a foundational discipline for modern analytics, data science, and machine learning. It involves designing, building, and maintaining the infrastructure and systems that process large volumes of structured and unstructured data.

One of the core processes in data engineering is the ETL pipeline:

  • Extract data from one or multiple sources.
  • Transform the data by refining, cleaning, or merging it with other data sets.
  • Load the data into a system—often a data warehouse or data lake—ready for analysis or modeling.

The complexity of modern data processes demands dynamic and efficient orchestration tools. Apache Airflow has emerged as a leading platform for building and managing ETL workflows in a robust, scalable manner.


Why Airflow?#

Apache Airflow is an open-source platform created by Airbnb. It simplifies the orchestration of complex workflows by providing a well-structured, Python-based approach to define, schedule, and monitor tasks.

Key Advantages:

  1. Pythonic: Workflows are defined via Python scripts, making it accessible to a large community of data engineers and developers.
  2. Modular: A wide variety of pre-built operators (e.g., BashOperator, PythonOperator, HiveOperator, SnowflakeOperator) reduce boilerplate.
  3. Scalable: Whether you need to schedule a few tasks or orchestrate thousands of tasks, Airflow can scale horizontally across multiple worker nodes.
  4. UI for Monitoring: The Airflow web UI provides clear visibility into the status of tasks, logs, and historical runs.
  5. Community and Ecosystem: A thriving community contributes plugins, providers, and hooks for different data stores and services.

Setting Up Airflow#

Before you can write your first ETL pipeline, you need to install and configure Airflow. Below is a minimum set of steps to get you started locally. For most production environments, you will adapt these steps to run on containers or Kubernetes.

Step 1: Environment and Dependencies#

Airflow runs on Python. It’s recommended to create a clean virtual environment:

Terminal window
# Create a new virtual environment
python -m venv airflow_env
source airflow_env/bin/activate
# Install or upgrade pip
pip install --upgrade pip setuptools wheel

Step 2: Install Airflow#

You can install Airflow from PyPI. It’s best to pin specific versions to ensure consistency if you’re working across multiple teams.

Terminal window
pip install apache-airflow==2.4.*

Step 3: Initialize the Airflow Database#

Airflow requires a metadata database to store information about DAG runs, tasks, variables, and logs. You can initialize a local SQLite database for development, but for production, it is recommended to use PostgreSQL or MySQL.

Terminal window
# Initialize the database
airflow db init

Step 4: Create a User#

Terminal window
airflow users create \
--username admin \
--firstname John \
--lastname Doe \
--role Admin \
--email admin@example.com

Step 5: Start Airflow Scheduler and Webserver#

To kick things off, run the scheduler and web server:

Terminal window
# Terminal 1: Start scheduler
airflow scheduler
# Terminal 2: Start webserver
airflow webserver --port 8080

You can now access the Airflow UI at http://localhost:8080.


Key Airflow Components#

DAGs#

The Directed Acyclic Graph (DAG) is Airflow’s core concept. It is a logical organization of tasks that must be executed in a certain order without any loops or cycles. DAGs are defined in Python files, which are scanned and loaded into Airflow.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
with DAG(
dag_id='example_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False
) as dag:
task_1 = BashOperator(
task_id='bash_task',
bash_command='echo "Hello, Airflow!"'
)
task_1

Tasks and Operators#

Within a DAG, there are operators that define specific actions, such as running Python functions, executing SQL queries, or triggering external services. Tasks are the instances of operators.

Commonly Used Operators:

  • BashOperator: Runs a bash command or script.
  • PythonOperator: Executes a specific Python callable function.
  • TriggerDagRunOperator: Triggers other DAGs.
  • SqlSensor, PostgresOperator, HiveOperator, SnowflakeOperator: Database-related tasks.

Task Dependencies#

One of the major features of Airflow is its intuitive approach to specifying dependencies. You can define tasks that run sequentially (e.g., task_1 >> task_2) or in parallel.

from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id='start', dag=dag)
middle = EmptyOperator(task_id='middle', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)
start >> middle >> end # This sets the execution order

Hooks and Connections#

Airflow provides Hooks to abstract connections to external systems like databases, cloud services, or APIs. For instance, PostgresHook or MySqlHook handle authentication details and provide a structured way to run queries.

You can manage connections securely in the Airflow UI under the Admin > Connections section.

Connection TypeExample HookUse Case
PostgresPostgresHookQuery or load data
MySQLMySqlHookQuery or load data
Google CloudGCSHook, BigQueryHookGCP data interactions
S3 (AWS)S3HookUpload or download files

Building a Simple ETL Pipeline#

Let’s illustrate a straightforward scenario: You have CSV files landing in an S3 bucket every day. You want to extract these files, perform basic data cleaning, and finally load them into a PostgreSQL database.

Extract#

  1. List new files in the S3 bucket
  2. Download these files to a local or temporary directory
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator, S3DownloadOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
with DAG(
dag_id='simple_etl',
default_args=default_args,
schedule_interval='@daily',
catchup=False
) as dag:
list_s3_files = S3ListOperator(
task_id='list_s3_files',
bucket='my-data-bucket',
prefix='incoming_csv/'
)
download_s3_files = S3DownloadOperator(
task_id='download_s3_files',
bucket='my-data-bucket',
s3_keys="{{ task_instance.xcom_pull(task_ids='list_s3_files') }}",
local_filepath='/tmp/'
)

Transform#

Data transformation can be handled in multiple ways. You could use a PythonOperator, BashOperator to call a Spark job, or a specialized plugin. Here, we’ll do a quick example in Python.

from airflow.operators.python import PythonOperator
import pandas as pd
import os
def transform_data(**context):
s3_file_list = context['ti'].xcom_pull(task_ids='list_s3_files')
for file_name in s3_file_list:
full_path = os.path.join('/tmp', file_name.split('/')[-1])
df = pd.read_csv(full_path)
# Example transformation: dropping rows with null values
df.dropna(inplace=True)
df.to_csv(full_path, index=False)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True
)

Load#

For loading into PostgreSQL, you could use a PostgresHook or PostgresOperator. Here’s a simple illustration:

from airflow.providers.postgres.operators.postgres import PostgresOperator
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres_conn',
sql="""
CREATE TABLE IF NOT EXISTS my_data (
column1 VARCHAR(255),
column2 VARCHAR(255),
...
);
"""
)
load_data = PostgresOperator(
task_id='load_data',
postgres_conn_id='my_postgres_conn',
sql="""
COPY my_data FROM '/tmp/my_data.csv' DELIMITER ',' CSV HEADER;
"""
)

Putting it all together:

list_s3_files >> download_s3_files >> transform_task >> create_table >> load_data

And that’s your basic ETL flow. This example can be expanded upon or refactored based on your data complexities or organizational requirements.


Scheduling and Execution#

Airflow’s scheduling mechanism triggers DAG runs based on intervals (e.g., daily, hourly, or Cron expressions). In the DAG definition, you specify schedule_interval. Airflow also supports macros to dynamically generate schedule settings.

Airflow Executors#

Executors define how Airflow distributes the work:

ExecutorDescription
SequentialExecutorSingle-threaded, primarily for local development.
LocalExecutorExecutes tasks in parallel using the resources of a single machine.
CeleryExecutorDistributes tasks to multiple worker nodes for large-scale environments.
KubernetesExecutorDynamically spawns pods for each task in a Kubernetes cluster.

Choosing the executor depends on your infrastructure, team size, and the complexity of your pipelines.


Advanced Airflow Concepts#

Templating and Macros#

Airflow uses the Jinja templating engine. This allows you to dynamically insert parameters:

from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='templated_task',
bash_command="""
echo "Current execution date is {{ ds }}"
echo "Previous execution date is {{ prev_ds }}"
""",
dag=dag
)

You can also leverage custom macros and environment variables to parameterize your tasks.

XCom#

XCom (cross-communication) is Airflow’s way to share data between tasks. Instead of storing large data sets, it’s intended for small pieces of metadata (e.g., file names, record counts).

def push_function(**context):
context['ti'].xcom_push(key='filename', value='file_20230101.csv')
def pull_function(**context):
data = context['ti'].xcom_pull(key='filename', task_ids='push_task')
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function
)
push_task >> pull_task

Variables#

Sometimes you need static or semi-static values such as directory paths, connection strings, or configuration flags. Airflow Variables allow you to store and retrieve these values globally.

from airflow.models import Variable
my_var = Variable.get("MY_VARIABLE_NAME")

Airflow Sensors#

Sensors are specialized operators that wait for a certain criterion to be met before proceeding. For example, an S3KeySensor can wait until a file appears in a bucket:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3_file',
bucket_key='incoming_csv/{{ ds }}/file_{{ ds_nodash }}.csv',
bucket_name='my-data-bucket',
aws_conn_id='aws_default',
poke_interval=60, # how often to check in seconds
timeout=3600 # how long to wait for file
)

Scaling Airflow for Production#

Celery Executor#

For larger production environments, the CeleryExecutor is often used. It employs a queue-based architecture where a broker (e.g., RabbitMQ or Redis) handles task dispatching, and multiple worker processes pull tasks from the queue.

  1. Airflow Scheduler enqueues tasks.
  2. Celery Workers pick up tasks from the queue.
  3. Result Backend (e.g., Redis or database) stores completed task statuses.

Kubernetes Executor#

If your organization runs on Kubernetes, the KubernetesExecutor allows tasks to be run inside isolated pods, providing fine-grained resource control. Each task can run in its own environment, simplifying dependency management and improving security.

Monitoring and Alerting#

Reliable monitoring is critical for production. Airflow integrates with external services (PagerDuty, Slack, email) to send alerts on task failures or SLA misses. You can also configure Airflow’s metrics to feed into solutions like Prometheus and Grafana.


Best Practices and Patterns#

Observability and Logging#

  • Centralized Logging: Store logs in a central location (S3, GCS, or a logging service) to ensure easy access and searchability.
  • Structured Logging: Log key details (file names, transaction IDs, etc.) to enable better filtering and correlation.
  • Task Duration Metrics: Keep an eye on average run times to proactively detect performance bottlenecks.

Testing and CI/CD#

  • Unit Tests: For PythonOperators, ensure the logic is tested in isolation.
  • DAG Validation: Your CI pipeline can run airflow dags test to validate DAG integrity.
  • Linting: Tools like flake8 and black can ensure code quality.

Version Control and Environments#

  • Separate Environments: Maintain staging, development, and production Airflow environments.
  • GitOps: Store DAG files and configuration in Git. Automate deployment to orchestrate changes seamlessly.
  • Tagging: Tag releases to revert or identify pipeline states quickly.

Real-World Use Cases#

  1. Data Warehouse Batch Loads: Orchestrating nightly loads from multiple sources into a consolidated warehouse.
  2. Machine Learning Pipeline: Scheduling feature engineering tasks, model training, and model deployment.
  3. Event-Driven Pipelines: Using sensors to trigger processes as soon as new data lands in S3 or when an API endpoint receives fresh data.
  4. Multi-Cloud Orchestration: Combining tasks that run on AWS (EMR or Glue), GCP (BigQuery), and on-premises databases.

Conclusion#

Apache Airflow has become an integral part of the data engineering toolkit. By offering an intuitive, Python-based paradigm and a robust scheduling environment, it enables teams to manage complex ETL pipelines with agility and reliability.

From the basic concepts of DAGs and Operators to advanced features like macros, sensors, and distributed executors, Airflow provides a scalable and extensible framework. Whether you are handling a small daily CSV ingest or orchestrating a multi-step cross-cloud pipeline, Airflow’s modular design, vibrant ecosystem, and rich set of integrations make it an excellent choice.

Keep exploring additional features such as custom plugins, Airflow REST API, and event-driven DAG triggers. Combine Airflow with other open-source tools like dbt or Great Expectations for further validation and transformation. With Airflow as the backbone of your data platform, you can focus on creating business value rather than wrestling with fragile, one-off scripts.

Embrace Airflow’s flexibility, and take your data engineering initiatives to the next level with reliable, scalable, and maintainable ETL pipelines. Happy scheduling!

Cutting-Edge Data Engineering with Airflow ETL Pipelines
https://science-ai-hub.vercel.app/posts/d2e08c02-0476-4b46-94c3-c9f21e7bdacc/4/
Author
AICore
Published at
2025-03-22
License
CC BY-NC-SA 4.0