Adaptive Query Execution in Apache Spark: Runtime Optimization Framework

apache spark logo

Introduction

Adaptive Query Execution (AQE) is a runtime optimization framework introduced in Apache Spark 3.0 that dynamically adjusts query execution plans based on statistics collected during query execution. Unlike traditional static query optimization, where the execution plan is fixed before processing begins, AQE enables Spark to re-optimize sub-plans at runtime, leading to improved performance in scenarios involving data skew, variable partition sizes, or inaccurate initial size estimates. This framework builds on Spark's Catalyst optimizer by incorporating runtime metrics, such as actual partition sizes and data volumes, to make informed decisions about join strategies, partition coalescing, and skew mitigation.

AQE is particularly valuable in distributed environments where data characteristics—such as skew or unexpected small partitions—cannot be fully predicted at compile time. Enabled by default since Spark 3.2, AQE applies to Spark SQL, DataFrames, and Datasets, and is fully supported in PySpark. According to the Apache Spark documentation, AQE can reduce query execution time by 2–10× in skewed workloads by automatically addressing common bottlenecks without requiring manual intervention. This article details AQE's architecture, key features, configuration, implementation examples, and best practices, drawing from official Spark documentation, Databricks resources, and PySpark guides.

How AQE Works

AQE operates by inserting an AdaptiveSparkPlan node at the root of the query execution plan. This node acts as a checkpoint where Spark collects runtime statistics after completing certain stages (e.g., shuffles or scans) and re-optimizes subsequent stages accordingly. The process follows these steps:

  1. Initial Planning: Catalyst generates a static physical plan based on pre-execution statistics (e.g., table metadata or user-provided hints).

  2. Execution of Early Stages: Spark executes the initial stages, collecting metrics like partition sizes, row counts, and data volumes.

  3. Runtime Re-optimization: At the AdaptiveSparkPlan boundary, AQE analyzes the metrics and applies transformations, such as coalescing partitions or switching join types, to generate a revised plan.

  4. Iterative Execution: The cycle repeats for remaining stages until the query completes. The Spark UI displays evolving plans under the AdaptiveSparkPlan node, with isFinalPlan flagged as false during execution and true upon completion.

This runtime adaptability contrasts with pre-AQE Spark (versions < 3.0), where only basic dynamic coalescing was available since Spark 1.6. In Spark 3.2+, AQE integrates seamlessly with Tungsten's whole-stage code generation, ensuring re-optimized plans remain efficient. Databricks notes that AQE-applied queries typically show one or more AdaptiveSparkPlan nodes in the query plan, allowing visualization of plan evolution in the Spark UI.

Factors influencing AQE effectiveness include data skew (e.g., one partition holding 90% of rows), inaccurate statistics (leading to suboptimal initial joins), and high parallelism (creating many small tasks). AQE mitigates these by leveraging runtime data, but its overhead is minimal (typically <5% of query time) due to targeted re-optimization only when beneficial.

Key Features of AQE

AQE encompasses several interconnected features, each addressing specific performance bottlenecks. These are enabled individually but work synergistically when spark.sql.adaptive.enabled is set to true.

  1. Coalescing Post-Shuffle Partitions After shuffle operations (e.g., groupBy or join), AQE merges small partitions into larger ones to reduce task overhead and improve resource utilization. It uses runtime map output statistics to target a partition size of spark.sql.adaptive.advisoryPartitionSizeInBytes (default: 64 MB). This prevents the "small file problem" in shuffles, where hundreds of tiny tasks dominate execution time.

  2. Handling Data Skew in Joins AQE detects skewed partitions (those exceeding the median size by spark.sql.adaptive.skewJoin.skewedPartitionFactor times, default 5.0, or >256 MB absolute) and splits them into smaller, evenly distributed subtasks. For sort-merge joins, it may replicate skewed keys across multiple partitions to balance load.

  3. Dynamic Join Replanning

    • Sort-Merge to Broadcast Join: If runtime statistics reveal one join side is smaller than spark.sql.adaptive.autoBroadcastJoinThreshold (defaults to spark.sql.autoBroadcastJoinThreshold, typically 10 MB), AQE switches to a broadcast hash join, avoiding expensive shuffles.

    • Sort-Merge to Shuffled Hash Join: For uniformly small partitions (< spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold, default 0), AQE converts to a shuffled hash join, skipping unnecessary sorting.

  4. Dynamic Partition Pruning AQE prunes irrelevant partitions during joins or filters based on runtime values from the other side, reducing I/O. This extends Catalyst's static pruning to dynamic scenarios.

These features are illustrated in the Spark UI, where plan nodes evolve (e.g., from SortMergeJoin to BroadcastHashJoin), and metrics like partition counts decrease post-coalescing.

Configuration Options

AQE is controlled via Spark SQL configurations, which can be set programmatically, in spark-defaults.conf, or via SQL SET commands. Below is a comprehensive table of key options from Spark 3.2+ (sourced from Apache Spark docs and Databricks).

Property Default Description Since
spark.sql.adaptive.enabled true Enables AQE framework for runtime re-optimization 3.0.0
spark.sql.adaptive.coalescePartitions.enabled true Enables coalescing of small post-shuffle partitions 3.0.0
spark.sql.adaptive.skewJoin.enabled true Enables skew optimization in sort-merge joins 3.0.0
spark.sql.adaptive.autoBroadcastJoinThreshold -1 (inherit from autoBroadcastJoinThreshold) Max size for runtime broadcast join conversion 3.2.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB Target size for coalesced partitions 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 Multiplier for skew detection relative to median partition size 3.0.0
spark.sql.adaptive.localShuffleReader.enabled true Enables local shuffle reading to avoid extra shuffles 3.0.0

Code Examples

AQE requires minimal code changes—primarily configuration. Below are PySpark examples demonstrating setup and usage.

Example 1: Enabling AQE for a Skewed Join Workload

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AQE Skew Join Example") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB") \
    .getOrCreate()

# Sample skewed data: one key has 90% of rows
orders_df = spark.range(1000000).withColumn("key", (col("id") % 10).cast("string")).withColumn("value", rand())
lookup_df = spark.range(10).withColumn("key", col("id").cast("string"))

# Join without AQE: skewed key causes tailing tasks
# With AQE: splits skewed partition automatically
result = orders_df.join(lookup_df, "key")

result.explain()  # Shows AdaptiveSparkPlan with skew handling
result.count()

In this example, AQE detects the skewed key "0" and splits its partition, balancing tasks across executors (per Spark docs and PySparkGuide).

Example 2: Dynamic Broadcast Join Conversion

# Enable runtime broadcast conversion
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "50MB")

# Query where one side is unexpectedly small after filter
large_df = spark.read.parquet("/path/to/large_table.parquet")  # 1 TB
small_filtered = large_df.filter("date >= '2025-01-01'")  # Filters to 20 MB

result = small_filtered.join(dim_df, "id")  # AQE switches to broadcast if <50MB

result.explain()  # Initial plan: SortMergeJoin → Runtime: BroadcastHashJoin

Databricks documentation highlights how this avoids unnecessary shuffles when initial estimates are inaccurate.

Example 3: Coalescing Partitions via SQL

-- Set via SQL (equivalent to PySpark config)
SET spark.sql.adaptive.coalescePartitions.enabled = true;
SET spark.sql.adaptive.advisoryPartitionSizeInBytes = 128MB;

SELECT product, SUM(sales) as total_sales
FROM sales_table
GROUP BY product;  -- AQE coalesces small post-groupBy partitions

Limitations and Best Practices

While AQE significantly enhances performance, it has limitations. Runtime re-optimization adds minor overhead (1–5% in most cases) and may not trigger if stages are too small or statistics are unavailable. Skew optimization can introduce extra shuffles, and dynamic coalescing reduces parallelism, potentially underutilizing clusters. It does not apply to RDD APIs or certain streaming queries without explicit configuration.

Best practices include:

  • Enable AQE universally (spark.sql.adaptive.enabled=true) unless profiling shows regressions.

  • Set spark.sql.adaptive.coalescePartitions.parallelismFirst=false on resource-constrained clusters to prioritize partition size over parallelism.

  • Use ANALYZE TABLE for accurate initial statistics, reducing reliance on runtime adjustments.

  • Monitor via Spark UI: Look for AdaptiveSparkPlan nodes and metrics like "number of partitions coalesced."

  • Tune thresholds conservatively: Start with defaults and adjust based on workload (e.g., increase skewedPartitionThresholdInBytes for large datasets).

  • Combine with Delta Lake for automatic statistics collection and ZORDERing to amplify AQE's pruning.

Per Spark CodeHub and PySparkGuide, test AQE on representative workloads using EXPLAIN EXTENDED to verify plan changes.

Conclusion

Adaptive Query Execution represents a paradigm shift in Spark SQL optimization, bridging the gap between static planning and real-world data variability. By leveraging runtime statistics for dynamic adjustments, AQE delivers substantial performance gains—often 2–10× in skewed or partition-heavy queries—while requiring minimal configuration. As Spark evolves toward even more adaptive systems in future releases, AQE's integration with Catalyst and Tungsten ensures it remains a cornerstone for efficient, scalable data processing in production environments.