2483 words
12 minutes
Optimizing Throughput and Latency in Spark Structured Streaming Jobs

Optimizing Throughput and Latency in Spark Structured Streaming Jobs#

Spark Structured Streaming is a powerful engine for real-time analytical processing. It extends the Spark SQL engine to process continuous streams of data while retaining Spark’s structured APIs. With Structured Streaming, you can easily build streaming pipelines using the Dataset/DataFrame APIs, unify batch and streaming computations, and benefit from Spark’s optimizations such as Catalyst optimizer and Tungsten engine.

In this blog post, we will explore strategies to optimize throughput and minimize latency in Spark Structured Streaming jobs. We’ll start with the basic concepts, proceed through intermediate techniques, and then cover more advanced optimizations. By the end, you’ll have a clear picture of how to approach performance tuning from streaming architecture design all the way to Spark configuration and resource planning.

Table of Contents#

  1. Introduction to Spark Structured Streaming
  2. Core Concepts and Terminology
  3. Understanding Throughput and Latency
  4. Getting Started with a Simple Example
  5. Micro-Batching vs. Continuous Processing
  6. Key Performance Tuning Strategies
  7. Optimizing Cluster and Resource Configuration
  8. Data Partitioning and Shuffling
  9. Fault Tolerance and Recovery Considerations
  10. Monitoring and Metrics
  11. Advanced Tuning and Best Practices
  12. End-to-End Example with Code Snippets
  13. Common Pitfalls and How to Avoid Them
  14. Conclusion

Introduction to Spark Structured Streaming#

Spark Structured Streaming emerged as the next step in the evolution of Spark Streaming, which initially used the DStreams (Discretized Streams) API. With Structured Streaming, the focus shifted to using the same higher-level abstractions (DataFrames and Datasets) for both batch and streaming workloads. This unification simplifies development, testing, and deployment, as you can reuse much of the same code and data processing logic across different applications.

Unlike traditional Spark Streaming, which processes data in micro-batches only, Structured Streaming’s adaptive engine offers both micro-batch processing and experimental continuous processing modes. This flexibility helps users tune the fine balance between throughput (the volume of data processed) and latency (the time from data ingestion to result availability).

Key benefits of Structured Streaming:

  • Familiar DataFrame/Dataset APIs.
  • End-to-end Fault Tolerance built on Spark’s checkpointing and WAL (Write-Ahead Logs).
  • Seamless integration with a variety of data sources (Kafka, file systems, sockets, etc.) and sinks (files, console, Kafka, etc.).
  • Advanced stateful processing features like streaming joins, aggregations, and window operations.

Core Concepts and Terminology#

Before diving into optimization techniques, let’s clarify some essential terminologies used in Spark Structured Streaming:

  • Input Source: The data system or file format from which the streaming job reads events, such as Kafka, socket streams, or file directories.
  • Sink: The target system or file format where processed data is published or persisted, such as Kafka, console logs, or file systems like HDFS.
  • Trigger: Determines how often the streaming query processes new data. For example, a trigger of 1 second, 5 seconds, or “once” (for batch-like behavior).
  • Checkpoint Location: A path (like HDFS or a cloud blob storage) where metadata about the streaming query is stored. This data is used for recovery and exactly-once semantic guarantees.
  • Stateful Transformation: Operations that require tracking accumulated data over time (e.g., streaming joins, windowed aggregations).
  • Watermark: A threshold for handling late data in stream processing, especially for aggregations. Data older than the watermark is considered “late” and can be discarded.

Understanding these concepts is vital when tuning throughput and latency, because different data sources, triggers, transformations, and checkpoint settings can lead to drastically different performance profiles.

Understanding Throughput and Latency#

  • Throughput: The amount of data processed in a given time frame (e.g., records per second or MB/s).
  • Latency: The time elapsed from the moment data is produced/arrives to the moment results are available (or published) to the sink.

A high-throughput system can process large volumes of data per unit time, whereas a low-latency system produces results quickly. There is often a trade-off between them:

  • Increasing micro-batch intervals can improve throughput but introduces extra latency due to batching.
  • Reducing micro-batch intervals may reduce latency but can put more overhead on the system and reduce overall throughput.

Getting Started with a Simple Example#

Before we optimize, let’s present a minimal Structured Streaming example. Assume we are reading data from a Kafka topic, performing a basic transformation, and writing to the console sink. Below is a simplified DataFrame-based approach.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SimpleStreamingApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("SimpleStreamingApp")
.getOrCreate()
// Read from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "myTopic")
.load()
// Convert the binary "value" column to string
val stringDF = df.selectExpr("CAST(value AS STRING) as message")
// Simple transformation: split the message by comma and extract fields
val transformedDF = stringDF
.withColumn("parts", split(col("message"), ","))
.withColumn("col1", col("parts").getItem(0))
.withColumn("col2", col("parts").getItem(1))
.drop("parts")
val query = transformedDF.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/path/to/ckpt")
.start()
query.awaitTermination()
}
}

This sample code sets up a basic pipeline: read from Kafka, parse some fields, and write to the console. It’s enough to demonstrate the building blocks of a Structured Streaming job. But to optimize throughput and latency, we need to engage more deeply with triggers, parallelism, resource tuning, and careful choices of transformations.

Micro-Batching vs. Continuous Processing#

Spark Structured Streaming provides two modes of execution:

  1. Micro-Batch Mode:

    • Default mode.
    • The engine collects data over a small interval, forms a micro-batch, processes it, and writes the results to the sink.
    • Triggers can be set to “ProcessingTime” intervals (e.g., 1 second, 10 seconds) or “Once” (one-time batch).
  2. Continuous Processing Mode (Experimental):

    • Tries to push data through the execution plan with lower latency by maintaining continuous tasks.
    • Does not support all transformations (e.g., certain stateful operations are not yet supported).
    • Potentially yields lower latency but may be more limited in features.

Balancing Micro-Batch Intervals#

  • Very short intervals, like 100 ms, can reduce latency but may overwhelm the driver with overhead.
  • Longer intervals, like 1–5 seconds, often strike a balance.
  • Adapting this interval depending on your cluster’s throughput capacity is crucial.

Keep in mind the actual end-to-end latency is not just about micro-batch intervals. Downstream processing, security checks, sink latencies, and network overhead also matter.

Key Performance Tuning Strategies#

Schema Inference and Evolution#

When dealing with structured data, specifying the schema upfront helps Spark avoid the overhead of inferring schemas at runtime. For high-throughput systems:

  • Providing a well-defined schema in the code can significantly reduce overhead.
  • If the schema of the data can evolve, consider how Spark will handle new columns or removed columns.

Checkpointing and Write-Ahead Logs (WAL)#

Checkpointing provides fault tolerance by saving information about the progress of the stream and the state of any stateful transformations at a reliable storage location. However, writing large amounts of checkpoint data can slow down the system.

Recommendations:

  • Use a fast, reliable, distributed filesystem such as HDFS, S3, or Azure Blob Storage to store checkpoints.
  • Periodically clean up or archive old checkpoint data (especially for offset logs).
  • Separate checkpoint directories across different streaming queries to prevent conflicts.

Stateful vs. Stateless Operations#

Stateless transformations (e.g., simple select, map, filter) do not need to store any data across triggers. Conversely, operations like aggregations, window functions, and joins with stateful requirements store partial results in memory or on disk.

  • Performance overhead of state management: As state grows, throughput may drop, and checkpoint size may increase.
  • If possible, reduce the scope of state by partitioning the data or by using time windows with a watermark to discard stale states.

Optimizing Joins#

Stateful streaming joins (e.g., when you join a stream with another stream or a static table) are expensive because Spark needs to keep track of rows that may match in the future.

  • Stream-Static Join: Often easier to handle, as the static table can be broadcasted (if small enough).
  • Stream-Stream Join: Requires maintaining both sides in memory until the matching rows appear or the watermark expires.
  • Filter or partition the data to reduce the join key cardinality.

Handling Late Data and Watermarking#

Watermarking is key to bounding how long Spark should maintain state for windows and aggregations. This reduces memory pressure and speeds up execution by cleaning up old data.

Sample code for watermarking and windowed aggregation:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val events = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
val eventsParsed = events.selectExpr("CAST(value AS STRING) as eventString")
.withColumn("eventTime", current_timestamp()) // or extracted from the eventString
val windowedCounts = eventsParsed
.withWatermark("eventTime", "10 minutes")
.groupBy(
window(col("eventTime"), "5 minutes", "1 minute")
)
.count()
windowedCounts.writeStream
.format("console")
.outputMode("update")
.trigger(Trigger.ProcessingTime("1 minute"))
.option("checkpointLocation", "/path/to/checkpoint")
.start()
.awaitTermination()

This example sets a watermark of 10 minutes, meaning data older than 10 minutes past the maximum event time seen so far is considered late and can be dropped. It processes the data in 5-minute windows sliding every 1 minute.

Optimizing Cluster and Resource Configuration#

Structured Streaming jobs can be resource-intensive. A well-tuned cluster configuration ensures high throughput and low latency:

  1. Executor and Core Sizing:

    • The number of executors and the number of cores per executor determine parallelism.
    • For CPU-bound workloads, distributing tasks across multiple executors can help scale throughput.
    • For I/O-bound workloads (reading from Kafka or writing to external systems), ensure you have enough network bandwidth.
  2. Memory Configuration:

    • For stateful operations, you might need enough memory (storage + overhead) to hold state.
    • Allocating excessive memory per executor can cause garbage collection overhead.
  3. Cluster Manager:

    • YARN, Kubernetes, or standalone, Spark’s cluster manager choice can influence how resources are shared and allocated.
    • If running on Kubernetes, ensure your resource requests and limits are set to handle peak loads.
  4. Network Considerations:

    • Data shuffles and reading/writing to external systems can be network-heavy.
    • Ensure minimal data movement by co-locating compute and data, if possible.
    • Tune network timeouts, compression, and RPC settings in spark.conf if needed.

Example Spark Configurations#

Configuration PropertyDescriptionExample Value
spark.executor.memoryMemory allocated per executor8g
spark.executor.coresNumber of CPU cores allocated per executor4
spark.executor.instancesNumber of executors (YARN or stand-alone cluster)10
spark.sql.shuffle.partitionsNumber of partitions for shuffles (used in aggregations/joins)200
spark.streaming.backpressure.enabledEnable dynamic rate limit to handle ingestion backpressuretrue
spark.sql.streaming.minBatchesToRetainMinimum number of completed batches to retain for state management100

Tuning these parameters depends heavily on your workload characteristics, data volume, and latency requirements.

Data Partitioning and Shuffling#

Shuffles occur when data is redistributed across executors or partitions (e.g., after a groupBy or join). Minimizing shuffles is crucial to improving performance.

  • Partition Pruning: If possible, push down predicates to reduce data volume.
  • Optimized Joins: Where feasible, broadcast smaller tables in a stream-static join scenario.
  • Watermarking for Shuffle Reduction: By bounding state with watermarks, Spark can reclaim memory and reduce overhead.

Consider configuring spark.sql.shuffle.partitions to a value that aligns with the size of data. Default is often 200 or 200 shuffle partitions, which might be too high or too low, depending on your cluster size.

Fault Tolerance and Recovery Considerations#

When a Spark Streaming job fails or restarts, the checkpoint data ensures that it can resume from the last processed offset. This feature provides exactly-once semantics for many sources (Kafka, file-based sources).

However, if your checkpoint files grow too large, startup and recovery times can become lengthy. Strategies to mitigate include:

  • Frequent housekeeping: Periodically clean old checkpoints, or maintain a separate checkpoint directory for each job or each version.
  • Using Delta Lakes or Other Table Formats: Some advanced formats can manage the offsets and states more efficiently.

Monitoring and Metrics#

Dedicated monitoring is essential for diagnosing performance bottlenecks, especially for streaming jobs. Look at:

  • Processing and Scheduling Delays: Track micro-batch scheduling overhead and processing times.
  • Input and Output Rates: Compare ingestion rates with output throughput to detect backlogs.
  • Watermark Delays: Observe how far behind the real-time event time the watermark is.
  • State Store Metrics: For stateful operations, watch memory usage, state cleanup overhead, and checkpoint location usage.

Tools for monitoring:

  • Spark UI: Provides detailed insights into streaming query progress, job execution, and stage times.
  • Ganglia / Grafana / InfluxDB: For cluster-wide monitoring of CPU, memory, and network.
  • AWS CloudWatch or GCP Stackdriver: Managed solutions if your cluster is running in the cloud.

Advanced Tuning and Best Practices#

After you’ve covered the basics, there are more advanced techniques to push performance further:

  1. Adaptive Execution: Spark 3.0+ introduces adaptive query execution for batch queries, but many improvements also help reduce shuffles in streaming scenarios.
  2. Kafka Partition Rebalancing: If your Kafka source is partitioned poorly (e.g., with only one partition), you’ll have limited parallelism. Ensure your Kafka brokers and partitions match your throughput needs.
  3. Memory Tuning for Streaming State: For complex aggregations, the state can balloon. Consider:
    • Tuning spark.memory.fraction and spark.memory.storageFraction.
    • Ensuring checks to avoid out-of-memory errors if state size grows unexpectedly.
  4. Batch and Stream Unification: If you’re mixing batch with streaming, rely on DataFrame transformations that can handle both modes. This unification helps ensure consistent performance tuning across jobs.
  5. Using Continuous Processing: For ultra-low latency, experiment with continuous processing mode. Note the limitations in supported transformations.

End-to-End Example with Code Snippets#

Below is an expanded example illustrating stateful streaming with windowed aggregations, watermarks, and optimized partition settings. Suppose you have clickstream data from Kafka, which you want to analyze in near real-time.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object ClickstreamAnalytics {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("ClickstreamAnalytics")
// Configure spark defaults for streaming
.config("spark.sql.shuffle.partitions", "200")
.config("spark.streaming.backpressure.enabled", "true")
.getOrCreate()
// Set log level to reduce console output
spark.sparkContext.setLogLevel("WARN")
// Read from Kafka
val clicksDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092")
.option("subscribe", "clickstream_topic")
.option("startingOffsets", "latest")
.load()
// Extract message and timestamps
val parsedDF = clicksDF.selectExpr("CAST(value AS STRING) as rawMessage")
.withColumn("eventTime", current_timestamp()) // Alternatively, parse from rawMessage
// Minimal parsing - real scenario: parse JSON or CSV fields
val enrichedDF = parsedDF
.withColumn("userId", expr("substring(rawMessage, 0, 5)")) // mock parsing
.withColumn("url", expr("substring(rawMessage, 6, length(rawMessage))"))
// Windowed aggregation: count clicks per URL over 5 minute windows
val windowedCounts = enrichedDF
.withWatermark("eventTime", "10 minutes") // Tolerate up to 10 minutes late data
.groupBy(
window(col("eventTime"), "5 minutes"),
col("url")
)
.count()
.withColumnRenamed("count", "clickCount")
val query = windowedCounts.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("30 seconds"))
.format("console")
.option("checkpointLocation", "/tmp/spark_checkpoints/clickstreamAnalytics")
.start()
query.awaitTermination()
}
}

Explanation#

  1. Kafka source with startingOffsets = latest ensures we read only new messages.
  2. withWatermark("eventTime", "10 minutes") ensures that any event older than 10 minutes beyond the maximum observed timestamp is considered late and excluded from current windows.
  3. A window of 5 minutes. Every 30 seconds, the streaming query updates the counts for each URL in the window.
  4. spark.streaming.backpressure.enabled helps manage spikes in event rates by dynamically adjusting the data intake rate.

Common Pitfalls and How to Avoid Them#

  1. Ignoring the Necessity of a Checkpoint Location

    • Forgetting to set a checkpointLocation can lead to losing state or repeated data processing. Always specify a reliable checkpoint directory.
  2. Excessive Micro-Batch Frequency

    • Setting an extremely low trigger interval can result in overhead dominating actual work. Balance your micro-batch intervals.
  3. Massive State Store

    • Not using watermarks or bounding state can lead to unbounded state growth, risking out-of-memory errors and degrading performance.
  4. Insufficient Parallelism

    • If you have a single Kafka partition or restrict your Spark executors too heavily, you won’t take advantage of cluster parallelism.
  5. Over-Reliance on Schema Inference

    • At high throughput, allowing Spark to re-infer schemas can be costly. Instead, specify schemas programmatically.
  6. Misconfigured Resources

    • Allocating too large of a heap can increase garbage collection times. Allocating too little can result in frequent GC or out-of-memory.

Conclusion#

Optimizing throughput and latency in Spark Structured Streaming requires a comprehensive approach that spans application design, cluster configuration, and operational discipline. By choosing the right trigger intervals, bounding state with watermarks, minimizing shuffles, properly checkpointing, and monitoring metrics, you can achieve both high throughput and low latency for your real-time pipelines.

In summary:

  • Start with a solid understanding of Structured Streaming concepts.
  • Tweak micro-batch intervals and continuous processing modes.
  • Allocate and tune cluster resources effectively, including memory, cores, and parallelism.
  • Use advanced features like watermarking, backpressure, and careful join strategies to balance performance with correctness.
  • Continuously monitor your streaming jobs using Spark’s UI and external monitoring tools.

With these insights, you can build robust, efficient, and scalable streaming applications that meet your organization’s real-time analytics and data processing needs. The powerful abstractions in Spark Structured Streaming make it easier than ever to handle high-velocity data in a fault-tolerant and efficient manner.

Optimizing Throughput and Latency in Spark Structured Streaming Jobs
https://science-ai-hub.vercel.app/posts/6731dbd2-1c64-44cc-ad38-3ff1a9fcd2f7/8/
Author
AICore
Published at
2025-02-03
License
CC BY-NC-SA 4.0