Understanding Spark’s Execution Plan: How Spark Executes Your Code

apache spark logo

Knowing the internals of how Spark executes your code is essential in order to get the max out of this distributed framework. You have to understand the steps Spark takes once you submit your application code and in this article we are gonna deep dive into that.

In order to generate an optimized version of your code, Spark creates multiple plans, applying various optimization techniques on it and finally selecting the best possible plans to run the code. Code could be operations on DataFrames, Datasets, or SQL. Spark creates an execution plan internally to get executed across the clusters and using some commands you can look into those plans to see how Spark decided to execute your code.

The Big Picture

When you write your Spark code (DataFrame/Dataset or RDD), nothing is executed immediately. Spark is lazily evaluated. The actual computation only starts when an action (e.g., count(), collect(), write, show()) is called. Below is complete flow from code to cluster execution on a high-level:

Your Code → SparkSession → SparkContext → Logical Plan (Catalyst) → Optimized Plan → Physical Plan → Stages (DAGScheduler) → Tasks → TaskScheduler → Executors → Results

The part we care about deeply is everything that happens on the driver before tasks are launched: the Catalyst optimizer and the physical planning phase.

How can we see these various stages in Spark? (Various versions of ‘Explain’ command)

Spark gives you five official EXPLAIN modes (Spark 3.0+). We will go through each stage in order.

df.explain(mode = "simple")      // Only physical plan (what people usually see)
df.explain(mode = "extended")    // Logical + optimized logical + physical (default for explain(true))
df.explain(mode = "codegen")     // Physical plan + generated Java source code
df.explain(mode = "cost")        // Optimized logical plan with cost statistics (CBO)
df.explain(mode = "formatted")   // Beautiful tree with node IDs, splits, etc.
Spark Execution Plan Stages - EXPLAIN Modes

Apache Spark EXPLAIN() Modes – What You Actually See

EXPLAIN Mode Command What It Shows When to Use
Simple df.explain("simple")
or just df.explain()
Only the final physical plan (what will actually run). No logical plans, no costs. Quick check of what Spark will execute.
Extended df.explain("extended")
or df.explain(true)
Everything:
• Parsed (Unresolved) Logical Plan
• Analyzed (Resolved) Logical Plan
• Optimized Logical Plan
• Physical Plan
Full end-to-end debugging; understand Catalyst transformations.
Codegen df.explain("codegen") Physical plan + the actual generated Java source code for WholeStage CodeGen stages. Investigate codegen problems or performance of fused operators.
Cost df.explain("cost") Optimized logical plan with cost estimates from Cost-Based Optimizer (CBO).
Only appears if CBO is enabled.
Understand why Spark chose one plan over another when spark.sql.cbo.enabled=true.
Formatted df.explain("formatted") Beautifully indented physical plan with:
• Node IDs
• Split lines between stages
• WholeStage CodeGen asterisks clearly marked
Best for screenshots, presentations, and deep physical plan analysis.

Spark Catalyst Optimization Stages

Stage Description Key Rules / Features Visible In
Unresolved Logical Plan
(Parsed)
Tree built directly from your DataFrame/SQL code. Column names are still strings. Created by parser / DataFrame transformations extended explain (top section)
Analyzed Logical Plan
(Resolved)
All attributes and functions are resolved against the catalog. Each column gets a unique exprId. ResolveReferences, ResolveFunctions, TypeCoercion, etc. extended explain
Optimized Logical Plan Catalyst applies dozens of rule-based and cost-based optimizations. Predicate Pushdown, Projection Pushdown, Constant Folding, Join Reordering, CBO (if enabled), etc. extended, cost explain
Physical Plan Spark picks concrete physical operators (BroadcastHashJoin, SortMergeJoin, HashAggregate, WholeStageCodegen, AQE, etc.). Join selection, aggregation strategy, codegen fusing, Adaptive Query Execution All explain modes (most visible in simple, formatted)

Stage-by-Stage Deep Dive

1. Unresolved Logical Plan (also called "Parsed Logical Plan")

  • This is the very first tree Catalyst builds from your DataFrame operations.

  • It is called "unresolved" because attribute references (column names) are still strings and have no type or scope information.

  • Catalyst uses the Analyzer later to resolve them.

val df = spark.read.parquet("s3://events/")
df.filter("user_id = 123")           // "user_id" is just a string here
  .groupBy("country")
  .agg(count("*"))

Unresolved Logical Plan (you rarely see this directly, but extended explain shows it at the top):

== Parsed Logical Plan ==
Aggregate [country#45], [country#45, count(1) AS count#100]
 +- Filter (user_id#40 = 123)       // note: attribute is still unresolved
     +- Relation[user_id#40,country#45,...] parquet
  

This tree is built purely by syntactic transformation of DataFrame operations.

2. Analyzed (Resolved) Logical Plan

The Analyzer runs a set of resolution rules using the Catalog and Analysis rules:

  • Bind column names to attributes in input relations

  • Resolve functions (e.g., count, sum)

  • Type checking and coercion

  • Subquery resolution

  • View/CTE handling

Key resolution rules:

  • ResolveRelations

  • ResolveReferences

  • ResolveFunctions

  • ResolveAliases

  • ResolveAggFunctions, etc.

After this phase, every attribute has an AttributeReference with a unique exprId, data type, nullability, and metadata.

Output example:

== Analyzed Logical Plan ==
Aggregate [country#45], [country#45, count(1) AS count#100L]
 +- Filter (user_id#40L = 123)
     +- Relation[user_id#40L LongType,country#45 StringType,...] parquet
  

Notice user_id#40L – the L suffix means Long, and #40 is a unique expression ID.

3. Optimized Logical Plan (Catalyst Optimization)

This is where Spark's magic happens. Catalyst applies a series of rule batches repeatedly until fix-point.

Major rule batches (in order of execution):

Apache Spark Catalyst Optimizer – Major Rule Batches

Batch Name Purpose Important Rules / Examples
ConstantFolding Evaluate constant expressions at compile time Literal folding, 1 + 1 → 2, true && false → false
PredicatePushdown Push filters as early as possible (especially into data sources) PushDownPredicates, CombineFilters, Parquet/ORC row-group pruning
ProjectionPushdown Push required columns down to the scan PushProjectionThroughUnion, column pruning in Parquet scans
NullPropagation Eliminate unnecessary null checks when nulls are impossible Optimizes isNotNull checks on non-nullable columns
BooleanSimplification Simplify complex boolean logic a && true → a, De Morgan’s laws, etc.
SimplifyCasts Remove redundant or unnecessary type casts cast(cast(col as string) as string) → col
ColumnPruning Remove unused columns as early as possible ColumnPruning rule – huge I/O savings
LimitPushDown Push limit into scans and unions LocalLimit → GlobalLimit optimization
JoinReordering Reorder joins to minimize intermediate result size (rule-based) ReorderJoin (puts smallest relations first)
JoinElimination Remove joins that are proven unnecessary e.g., joining a table that’s filtered to zero rows
CollapseProject Merge consecutive Project operations CollapseProject – reduces operator overhead
RemoveNoopOperators Eliminate identity projections and redundant operators Remove Project(a, b)Project(a, b)

These batches are applied repeatedly until a fixed point is reached.
When spark.sql.cbo.enabled=true, Cost-Based Optimization (CBO) adds additional cost-aware rewrites on top.

Since Spark 2.2+ there is also Cost-Based Optimization (CBO) – optional, enabled by:

SET spark.sql.cbo.enabled=true
SET spark.sql.cbo.joinReorder.enabled=true

CBO adds:

  • Table/column statistics collection (ANALYZE TABLE ... COMPUTE STATISTICS)

  • Cardinality estimation

  • Cost model for different join strategies (Broadcast vs Shuffle Hash vs Sort-Merge)

With CBO enabled, you will see cost numbers in explain(cost=true):

== Optimized Logical Plan ==
Aggregate (cost=450000) ...
 +- Join Inner (cost=400000) ...
     :- Filter (cost=200000) ...
     +- Join Inner (broadcast) (cost=150000) ...
  

4. Physical Plans

Spark now selects a physical operator for every logical operator. This is done by a sequence of planning strategies (in SparkStrategies.scala):

Spark Logical vs Physical Operators – How Catalyst Chooses the Real Execution

Logical Operator Possible Physical Operators (what actually runs)
Scan
(Relation / Table)
FileSourceScanExec (row-based)
BatchScanExec (columnar batch)
ColumnarScanExec (Arrow/Columnar)
AqeScanExec (with Adaptive Query Execution)
Filter FilterExec (always)
Project ProjectExec
Aggregate
(GROUP BY)
HashAggregateExec (partial + final)
SortAggregateExec (when hash table would be too big)
ObjectHashAggregateExec (for complex types / very low cardinality)
Join BroadcastHashJoinExec (auto or hint, ≤ autoBroadcastJoinThreshold)
ShuffledHashJoinExec (one side fits in memory after shuffle)
SortMergeJoinExec (large-large joins, default for equi-joins)
CartesianProductExec (non-equi joins)
BroadcastNestedLoopJoinExec (non-equi + broadcast)
Sort SortExec (external sort if needed)
Limit LocalLimitExec (per partition)
GlobalLimitExec (across all partitions)
Union UnionExec
Expand ExpandExec
Used for GROUP BY with grouping sets, CUBE, ROLLUP
Window WindowExec (with range/row frame partitioning)
Uses special WindowGroupLimitExec in Spark 3.4+ for ranked limits

Spark chooses the physical operator during the physical planning phase based on statistics, configuration (e.g., autoBroadcastJoinThreshold), hints, and Adaptive Query Execution (AQE) runtime decisions.

Key physical planning decisions:

  1. Join Selection

    • If one side ≤ spark.sql.autoBroadcastJoinThreshold (default 10MB) → BroadcastHashJoinExec

    • Else if both sides have equi-keys and sorting is cheap → SortMergeJoinExec

    • Else → ShuffledHashJoinExec

  2. Aggregation Strategy

    • If grouping keys have high cardinality → SortAggregate (partial sort + final sort)

    • If low cardinality or pre-shuffled → HashAggregate

    • If using object-based aggregation (maps) → ObjectHashAggregateExec

  3. WholeStage Code Generation (Spark 2.0+)

    • Multiple physical operators are collapsed into a single Java class using Janino compiler.

    • Node groups are marked with * in explain() output:

      *(1) Filter
      *(1) ColumnarToRow
      *(1) FileScan parquet ...
    • Each * group becomes one WholeStageCodegenExec → one Java method with fused loops → huge performance win.

  4. Adaptive Query Execution (AQE) – Spark 3.0+ Enabled by default in Spark 3.2+:

    spark.sql.adaptive.enabled=true
    • Dynamically coalesces shuffle partitions

    • Switches shuffle hash join → broadcast if size misestimated

    • Optimizes skew joins (skewed partitions split)

    You will see AQE plans change at runtime:

== Physical Plan ==
AdaptiveSparkPlan (final plan after AQE)
  +- AQEShuffleRead (coalesced to 50 partitions)
  

How Everything Connects – From Code to Tasks

Let’s walk through a concrete example:

spark.read.parquet("s3://events/")
  .filter(col("date") >= "2025-01-01")
  .join(spark.read.parquet("s3://users/").hint("broadcast"), "user_id")
  .groupBy("country")
  .agg(count("*").as("users"))
  .write.parquet("s3://output/")

Execution timeline on driver:

  1. df.write() is an action → triggers planning

  2. Catalyst builds Unresolved Logical Plan from the lineage

  3. Analyzer resolves columns, functions → Analyzed Logical Plan

  4. Optimizer applies 30+ rule batches → Optimized Logical Plan (filter pushed into Parquet scan, broadcast hint respected)

  5. Physical planner:

    • Chooses BroadcastHashJoinExec because of hint

    • Chooses HashAggregateExec for groupBy

    • Collapses filter + scan + projection into WholeStageCodegen

  6. DAGScheduler:

    • Breaks the plan into stages at shuffle boundaries

    • Stage 1: read users → broadcast exchange

    • Stage 2: read events → filter → hash join → aggregate → shuffle write

    • Stage 3: final aggregation (if partial agg was used)

  7. Each stage → tasks (one per partition)

  8. Tasks run on executors, produce shuffle files or final output.

Reading a Real Physical Plan (Formatted Explain)

Example output snippet:

== Physical Plan ==
*(5) HashAggregate(keys=[country#25], functions=[count(1)])
+- Exchange hashpartitioning(country#25, 200), ENSURE_REQUIREMENTS, [plan_id=123]
   +- *(4) HashAggregate(keys=[country#25], functions=[partial_count(1)])
      +- *(3) BroadcastHashJoin [user_id#20L], [user_id#45L], Inner, BuildRight
         :- *(2) Project [user_id#20L, country#25]
         :  +- *(2) Filter (date#22 >= 2025-01-01)
         :     +- *(2) ColumnarToRow
         :        +- FileScan parquet ... 
         +- BroadcastExchange HashedRelationBroadcast Mode (build key [user_id#45L])
            +- *(1) FileScan parquet users[user_id#45L] ...
  

Interpretation:

  • * groups = WholeStage CodeGeneration units

  • Numbers in parentheses = stage + codegen ID

  • Exchange = shuffle boundary (new stage)

  • BroadcastExchange = no shuffle, data broadcasted

  • ColumnarToRow = transition from Arrow/columnar to row-based processing

Key Takeaways:

  1. Spark is lazy → everything is just a logical plan until an action.

  2. Catalyst has four major phases:

    • Unresolved → Analyzed → Optimized Logical → Physical

  3. Optimization is rule-based + optional cost-based (CBO).

  4. Physical planning decides join types, aggregation strategy, codegen.

  5. WholeStage CodeGen is the #1 performance feature of Spark 2.0+.

  6. AQE (Spark 3+) makes many decisions at runtime.

  7. Understanding exprId, WholeStageCodegen IDs, and Exchange nodes is key to reading plans.

With this knowledge you can now:

  • Explain why a query is slow by reading the physical plan

  • Force broadcast joins, control aggregation strategy

  • Write better Spark SQL that benefits from predicate pushdown

  • Debug shuffle explosions or codegen failures