From Batch to Live: Transitioning to Spark Structured Streaming
Table of Contents
- Introduction
- A Quick History of Apache Spark and Streaming
- What Is Structured Streaming?
- Setting Up Your Environment
- Spark Structured Streaming Fundamentals
- Streaming DataFrames and Datasets
- Sources and Sinks
- Triggers
- Output Modes
- Core Transformations and Operations
- Basic Transformations
- Aggregations
- Joins
- Event-Time Processing and Watermarking
- Checkpointing and Fault Tolerance
- Stateful Streaming
- Windows and Time-Based Operations
- Micro-Batch vs. Continuous Processing
- Advanced Topics
- Monitoring and Debugging
- Performance Tuning
- Integrating with Kafka
- Custom Sinks
- Handling Late Data
- Putting It All Together
- Conclusion and Further Reading
1. Introduction
Apache Spark is a powerful, open-source distributed computing system for processing large datasets efficiently. It gained popularity thanks to its versatility, offering different APIs for batch processing, SQL, machine learning, and graph analytics. Over time, Spark evolved to support streaming capabilities, enabling data engineers and scientists to build end-to-end pipelines that can process and analyze live data.
Traditionally, many organizations focused on batch-oriented data processing pipelines, which meant running scheduled jobs (e.g., overnight) to process and transform data in bulk. While this approach works for a number of use cases, it doesn’t cater well to scenarios requiring near real-time updates—like fraud detection, dynamic pricing, IoT data analysis, and so on.
Spark Structured Streaming is Apache Spark’s powerful framework for building continuous, near real-time data pipelines. This blog post explores everything from getting started with Spark Structured Streaming to advanced concepts such as watermarking, windowing, and stateful stream processing. By the end, you will be well equipped to transition your workloads from batch to live, unlocking the potential of continuous data processing in your organization.
2. A Quick History of Apache Spark and Streaming
Spark started as a flexible engine for distributed batch processing, designed to outperform Hadoop’s MapReduce with in-memory computation. Over time, Spark evolved many APIs:
- RDD (Resilient Distributed Dataset): Spark’s original low-level API.
- DataFrame: High-level API built on top of RDDs, inspired by data frames in R and Python’s pandas.
- Dataset: Type-safe, object-oriented API for Scala and Java, offering more compile-time type safety than DataFrames.
- Structured Streaming: High-level streaming API built on top of DataFrames and Datasets for near real-time data processing.
Before Structured Streaming, Spark offered Discretized Stream (DStream), which was micro-batch based. That earlier streaming engine worked well but required working with RDDs and transformations that were different from batch APIs, creating complexity in unifying batch and streaming pipelines. Structured Streaming unified the APIs, making it possible to handle streaming data using the same DataFrame and Dataset transformations that are used for batch analytics.
3. What Is Structured Streaming?
Structured Streaming is an extension of the Spark SQL engine that allows you to process data streams using the same abstractions and operations you would in batch mode. In practice, Spark Structured Streaming treats streaming data as an unbounded table, where new data continually arrives. As new rows appear in this unbounded table, Spark incrementally updates the results of a query.
Key Features
- End-to-end exactly-once semantics: Structured Streaming can ensure that each record is processed exactly once under supported configurations (particularly when reading from and writing to certain data sources such as Kafka).
- Same APIs for batch and streaming: You only need to define a transformation pipeline once, and Spark can run it in either batch or streaming mode with minimal changes.
- Fault tolerance: By design, Spark can recover from node failures, restarts, and partial job failures while preserving processing guarantees.
- Efficient state management: When using features like aggregations, Spark manages state internally and optimizes for memory usage, scale, and fault tolerance.
4. Setting Up Your Environment
To follow along with the examples provided in this blog post, you’ll need a functional Spark environment. Common ways to set this up include:
- Installing Spark locally from the Apache website.
- Using a Docker container that has Spark pre-configured.
- Setting up a cloud-based environment (Databricks, EMR on AWS, HDInsight on Azure, etc.).
For local development, installing Spark standalone (along with Python or Scala) is often sufficient.
Example: Local Installation Steps for Spark (Mac/Linux)
- Download Spark from the official website: https://spark.apache.org/downloads.html.
- Unpack the downloaded file:
tar -xvf spark-3.3.1-bin-hadoop3.tgz
- Add the Spark bin directory to your PATH, for instance:
export SPARK_HOME=~/spark-3.3.1-bin-hadoop3export PATH=$SPARK_HOME/bin:$PATH
- Verify the installation:
spark-shell --version
Once your environment is ready, you can follow the examples in Scala, Python, or even Spark Notebooks.
5. Spark Structured Streaming Fundamentals
Streaming DataFrames and Datasets
Structured Streaming operates on DataFrames (or Datasets in Scala/Java) in a streaming context. Essentially, you define a logical query on streaming data, and Spark runs it continuously in micro-batches (or continuous processing if configured) as new data arrives.
For example:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._
val spark = SparkSession.builder .appName("StructuredStreamingExample") .master("local[*]") .getOrCreate()
// Read streaming data from a directory of JSON filesval lines = spark.readStream .format("json") .schema(/* your schema here */) .load("path/to/your/streaming/data")
// Transform the dataval transformed = lines.withColumn("processed_timestamp", current_timestamp())
// Write the results to the consoleval query = transformed.writeStream .format("console") .start()
query.awaitTermination()
In this snippet:
readStream
indicates we’re reading data in a streaming fashion.- We specify the source using
.format("json")
and the path to the data. - We transform the stream using DataFrame operations, such as adding a new column.
- Finally, we start a streaming query that writes results to a sink.
Sources and Sinks
In Spark Structured Streaming, a source is where your streaming data originates. Sources are often message systems like Kafka, socket streams, or file directories. A sink is where your transformed data goes—possible sinks include files, memory, console, Kafka, or custom implementations.
Common Sources:
- Files (e.g., CSV, JSON, Parquet)
- Kafka
- Socket (for testing or simple demos)
- Rate source (useful for testing)
- Custom sources
Common Sinks:
- Console (for debugging)
- Memory (for smaller, in-memory tables)
- File (e.g., write Parquet files incrementally)
- Kafka
- Foreach sink (for custom handling)
Triggers
A trigger controls the frequency at which Spark checks for new data and updates the output. The main triggers are:
- Micro-batch (default): Processes data in micro-batches as soon as possible.
.trigger(Trigger.ProcessingTime("10 seconds"))
- Continuous: Uses the experimental continuous processing mode (lowest latency, but with certain limitations).
.trigger(Trigger.Continuous("5 seconds"))
- Once: Processes all available data once and then stops.
.trigger(Trigger.Once())
- Manual (as soon as possible): If no trigger is specified, Spark tries to process micro-batches continuously as data arrives.
Output Modes
Structured Streaming supports different ways of updating the query’s output:
- Append: Only newly processed rows are written to the sink. Works for queries that do not modify or remove previously processed data.
- Complete: The entire result table is recalculated and written to the sink. Often used for aggregations and reporting.
- Update: Only changes to the result table are written to the sink. Particularly useful with aggregations, but not supported by all sinks.
Example:
val query = transformed.writeStream .format("console") .outputMode("append") .trigger(Trigger.ProcessingTime("5 seconds")) .start()
6. Core Transformations and Operations
Basic Transformations
Most DataFrame transformations (e.g., select
, where
, withColumn
) used in batch contexts remain valid in Structured Streaming. However, keep in mind:
- Some batch transformations (like certain random splits) aren’t fully supported or behave differently in streaming.
- Certain actions (e.g.,
collect()
) are not allowed in streaming contexts because the data is unbounded.
For basic manipulation, you might do something like:
import org.apache.spark.sql.functions._
// Filter messages with certain conditionsval filtered = lines.filter(col("level") === "ERROR")
// Select specific columnsval selected = filtered.select("timestamp", "message")
Aggregations
When dealing with real-time data, aggregations (count, sum, average, etc.) over unbounded streams need special care with time constraints or triggers. Simple aggregations without a window can grow unbounded over time. For example:
// Group by a particular column, aggregating in real-timeval counts = lines.groupBy("category").count()
val query = counts.writeStream .outputMode("complete") .format("console") .start()
In this example, outputMode("complete")
is used because the entire table of category counts changes with every new record. You must store and update the entire result set.
Joins
Structured Streaming supports:
- Stream-static join: Joining a streaming dataset with a static (batch) dataset.
- Stream-stream join: Joining two streaming Datasets with support for time boundaries. This is a more advanced feature that typically requires watermarks and windowing.
Example (stream-static join):
val staticDF = spark.read.format("csv").option("header", "true").load("static_data.csv")val joined = lines.join(staticDF, Seq("id"), "left_outer")
Example (stream-stream join with time constraints):
val clicks = spark.readStream.format("kafka").load()val impressions = spark.readStream.format("kafka").load()
val joinedStream = clicks.join( impressions, expr(""" clicks.userId = impressions.userId AND clicks.timestamp >= impressions.timestamp - interval 5 minutes AND clicks.timestamp <= impressions.timestamp + interval 5 minutes """))
The exact syntax may vary depending on your schema, but the key point is defining a time window (or range) within which the join should occur.
7. Event-Time Processing and Watermarking
One of the most important aspects of real-time processing is dealing with event time, as opposed to processing time. Event time is the time at which events actually occurred, while processing time is the time at which Spark processes these events.
Why Does This Matter?
- Late data might arrive because of network delays, system outages, or any number of unforeseen issues.
- Relying solely on processing time can lead to incorrect results if data from the past arrives after an aggregation window closes.
Watermarking
Watermarks help Spark efficiently handle late data by bounding how far back in time the engine must keep state. For example:
import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.Trigger
val events = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "events") .load()
// Assume "timestamp" is an event-time fieldval withEventTime = events .selectExpr("CAST(value AS STRING)", "timestamp") .withColumn("eventTime", to_timestamp(col("timestamp")))
// Add watermark: keep data up to 10 minutes oldval aggregates = withEventTime .withWatermark("eventTime", "10 minutes") .groupBy(window(col("eventTime"), "5 minutes"), col("someKey")) .count()
val query = aggregates.writeStream .outputMode("update") .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) .start()
query.awaitTermination()
Here, watermarks define that any data older than 10 minutes (based on eventTime) can be safely considered late and discarded (or segregated). This mechanism prevents the state from growing indefinitely.
8. Checkpointing and Fault Tolerance
Structured Streaming provides fault tolerance by automatically keeping track of cumulative progress. However, you must configure a checkpoint directory where Spark tracks state:
val query = transformed.writeStream .option("checkpointLocation", "path/to/checkpoint/dir") .start()
When Spark recovers from a failure, it refers to the checkpoint directory to restore the last known state and pick up from where it left off. This ensures exactly-once or at-least-once semantics, depending on the source and sink.
Checkpointing Best Practices
- Use a reliable, fault-tolerant storage system (e.g., HDFS, S3, Azure Blob Storage).
- Each streaming query must have a unique checkpoint location.
- Don’t delete the checkpoint directory while a query is active; this can corrupt the query’s state.
9. Stateful Streaming
Algorithms that maintain some state (e.g., counters, machine learning models, anomaly detectors) over a data stream need stateful streaming. Structured Streaming allows you to manage state through aggregations, mapGroupsWithState, and flatMapGroupsWithState. These operations let you store per-key aggregates or custom states that persist across micro-batches.
Example: mapGroupsWithState
case class UserEvent(userId: String, action: String, timestamp: Long)case class UserState(userId: String, actionCount: Long)
import org.apache.spark.sql.Datasetimport org.apache.spark.sql.streaming.GroupState
def updateState(userId: String, events: Iterator[UserEvent], state: GroupState[UserState]): UserState = { val currentState = state.getOption.getOrElse(UserState(userId, 0)) val updatedCount = events.size + currentState.actionCount val newState = UserState(userId, updatedCount) state.update(newState) newState}
// Assuming we have a Dataset[UserEvent] called userEventsval stateful = userEvents .groupByKey(_.userId) .mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateState)
// This stateful DataFrame can be written to a sinkval query = stateful.writeStream .format("console") .outputMode("update") .start()
When new events for a specific user arrive, updateState
is called with an iterator of those events for the key. You can update the existing state and persist it, enabling use cases like user session tracking or real-time analytics.
10. Windows and Time-Based Operations
Windowing is crucial for periodic reporting and summarizing streaming data. Common window functions include tumbling windows, sliding windows, and session windows.
Tumbling Window
A tumbling window is a fixed-size window that doesn’t overlap. For example:
val windowedCounts = withEventTime .groupBy(window(col("eventTime"), "15 minutes")) .count()
If you receive event timestamps of 10:00:10, 10:00:20, and 10:14:59, all of these belong to the window [10:00:00, 10:15:00).
Sliding Window
A sliding window can overlap. For instance:
val slidingCounts = withEventTime .groupBy(window(col("eventTime"), "15 minutes", "5 minutes")) .count()
Each window is 15 minutes long, but they start in increments of 5 minutes. This can be useful for finer-grained analysis.
Session Window
Session windows group events that occur in close proximity into the same session and typically require a session gap. Currently, session windows in Structured Streaming might require special APIs (session_window
in SQL or the session window DSL).
11. Micro-Batch vs. Continuous Processing
By default, Structured Streaming operates in micro-batch mode, where each trigger creates a micro-batch. The latency in micro-batch mode is usually on the order of seconds. For workloads requiring very low latency, Spark introduced an experimental continuous processing mode. However, it comes with restrictions on supported operations.
- Micro-batch: Most widely used, supports the full set of Structured Streaming features.
- Continuous: Experimental, limited feature set, potential for sub-second latency.
In practical terms, most production workloads rely on micro-batches due to the availability of more operators and robust support in the ecosystem.
12. Advanced Topics
Monitoring and Debugging
Monitoring your streaming job is essential for production stability. Spark’s web UI (usually at http://
- Structured Streaming UI: In recent Spark versions, there is a dedicated tab for Structured Streaming, showing the status of ongoing queries, rate of data processing, and more.
- Logs: Always review Spark driver and executor logs for errors or performance bottlenecks.
- Metrics: Expose Spark metrics to external systems like Prometheus or use Spark listeners for custom metric aggregation.
Performance Tuning
- Memory Tuning: Increase executor memory or use more executors if you’re dealing with large stateful operations.
- Batch Size: Adjust how frequently micro-batches occur (e.g., bigger triggers or smaller triggers).
- Aggregation Optimization: Use partitioning strategies or broadcast joins when beneficial.
- Serialization: Choose efficient data formats (e.g., Parquet, Avro for Kafka) to reduce overhead.
- Shuffle Partitions: Tweak
spark.sql.shuffle.partitions
to optimize the number of partitions used during shuffle operations.
Integrating with Kafka
Kafka is often used for real-time data pipelines, so Spark provides native Kafka source/sink support. Here’s a minimal example:
val spark = SparkSession.builder.appName("KafkaExample").getOrCreate()
// Reading from Kafkaval inputDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "input_topic") .load()
import org.apache.spark.sql.functions._
// Convert Kafka's value (binary) to stringval dataDF = inputDF.selectExpr("CAST(value AS STRING) as message")
// Processing your streamval processedDF = dataDF .withColumn("processed_at", current_timestamp())
// Writing to Kafkaval query = processedDF.selectExpr("CAST(message AS STRING) as key", "CAST(message AS STRING) as value") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("topic", "output_topic") .option("checkpointLocation", "/tmp/checkpoints/kafka_example") .start()
query.awaitTermination()
Make sure you set checkpointLocation
so that Spark can manage state and offsets.
Custom Sinks
When none of the built-in sinks meet your requirements (e.g., you need to write data to a non-supported database or a custom external system), you can implement a ForeachWriter in Scala/Java or a similar mechanism in Python.
Example skeleton of a ForeachWriter in Scala:
import org.apache.spark.sql.ForeachWriterimport org.apache.spark.sql.Row
class CustomSinkWriter extends ForeachWriter[Row] { override def open(partitionId: Long, epochId: Long): Boolean = { // Open connection or session true } override def process(value: Row): Unit = { // Write to custom sink } override def close(errorOrNull: Throwable): Unit = { // Close connection/session }}
Then you attach it to your streaming DataFrame:
val query = processedDF.writeStream .foreach(new CustomSinkWriter) .start()
Handling Late Data
Even with watermarks, data can arrive late beyond the watermark threshold. These records are often dropped from aggregate calculations, but you can handle them separately by:
- Writing late data to a side sink for further manual inspection.
- Using a grace period in your watermarks to strike a balance between memory usage and accuracy.
13. Putting It All Together
Let’s combine multiple concepts into a simplified end-to-end pipeline example:
- Data Source: Kafka topic receiving JSON messages.
- Parsing and Filtering: Convert messages to a structured format, filter out invalid data.
- Aggregation: Maintain rolling counts per category using event-time with a tumbling window.
- Late Data Handling: Use a watermark of 10 minutes.
- Sink: Write aggregated results back to a different Kafka topic.
Example:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.Trigger
object EndToEndExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("EndToEndStructuredStreaming") .getOrCreate()
// 1. Kafka source val rawStream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "input_topic") .load()
// 2. Parsing and filtering val parsedStream = rawStream.selectExpr("CAST(value AS STRING) as jsonValue") .select(from_json(col("jsonValue"), /* schema */).as("data")) .select("data.*") .filter(col("category").isNotNull && col("timestamp").isNotNull)
// Convert to event time val withEventTime = parsedStream .withColumn("eventTime", to_timestamp(col("timestamp")))
// 3. Aggregation with watermark val aggregated = withEventTime .withWatermark("eventTime", "10 minutes") .groupBy( window(col("eventTime"), "5 minutes"), col("category") ) .count()
// 4. Sink: Write to Kafka val query = aggregated .selectExpr( "CAST(category AS STRING) AS key", "CAST(concat_ws(',', window.start, window.end, category, count) AS STRING) AS value" ) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "output_topic") .option("checkpointLocation", "/tmp/checkpoints/end_to_end") .trigger(Trigger.ProcessingTime("5 seconds")) .start()
query.awaitTermination() }}
What’s happening here:
- We read from a Kafka source, parse the JSON messages, and select valid fields.
- We add a column
eventTime
and define a watermark of 10 minutes. - We perform a tumbling window aggregation of 5 minutes on
eventTime
. - We output the transformed data back to a Kafka topic, ensuring each micro-batch is tracked via checkpointing.
14. Conclusion and Further Reading
Transitioning from batch to real-time analytics is a critical step for many data-driven organizations. Spark Structured Streaming offers a unified, high-level API that reduces the complexity of building and managing streaming pipelines. By following best practices—such as setting proper watermarks, checkpointing, and monitoring state—you can achieve robust, fault-tolerant streaming applications.
Structured Streaming can handle everything from simple file-based streaming to intricate pipelines involving message systems like Kafka, complex aggregations, stateful operations, and time-windowed analyses. As you become more familiar with the framework, you can extend these patterns to support larger, more demanding systems.
Additional Resources
- Official Spark Structured Streaming Documentation: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- Databricks Structured Streaming Guide: https://docs.databricks.com/spark/latest/structured-streaming
- Apache Kafka Documentation for Spark: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- Books:
- “Learning PySpark” by Tomasz Drabas and Denny Lee
- “Spark: The Definitive Guide” by Bill Chambers and Matei Zaharia
Armed with these insights and examples, you should be well on your way to building powerful, real-time data pipelines with Spark Structured Streaming. Whether you’re looking to perform real-time fraud detection, IoT analytics, or dynamic user interactions, Structured Streaming offers a robust and unified framework to bring your data architecture into the real-time era. Enjoy your journey into continuous data processing!