Understanding Spark’s Execution Plan: How Spark Executes Your Code
Knowing the internals of how Spark executes your code is essential in order to get the max out of this distributed framework. You have to understand the steps Spark takes once you submit your application code and in this article we are gonna deep dive into that.
In order to generate an optimized version of your code, Spark creates multiple plans, applying various optimization techniques on it and finally selecting the best possible plans to run the code. Code could be operations on DataFrames, Datasets, or SQL. Spark creates an execution plan internally to get executed across the clusters and using some commands you can look into those plans to see how Spark decided to execute your code.
The Big Picture
When you write your Spark code (DataFrame/Dataset or RDD), nothing is executed immediately. Spark is lazily evaluated. The actual computation only starts when an action (e.g., count(), collect(), write, show()) is called. Below is complete flow from code to cluster execution on a high-level:
The part we care about deeply is everything that happens on the driver before tasks are launched: the Catalyst optimizer and the physical planning phase.
How can we see these various stages in Spark? (Various versions of ‘Explain’ command)
Spark gives you five official EXPLAIN modes (Spark 3.0+). We will go through each stage in order.
df.explain(mode = "simple") // Only physical plan (what people usually see)
df.explain(mode = "extended") // Logical + optimized logical + physical (default for explain(true))
df.explain(mode = "codegen") // Physical plan + generated Java source code
df.explain(mode = "cost") // Optimized logical plan with cost statistics (CBO)
df.explain(mode = "formatted") // Beautiful tree with node IDs, splits, etc.
Apache Spark EXPLAIN() Modes – What You Actually See
| EXPLAIN Mode | Command | What It Shows | When to Use |
|---|---|---|---|
| Simple | df.explain("simple")or just df.explain() |
Only the final physical plan (what will actually run). No logical plans, no costs. | Quick check of what Spark will execute. |
| Extended | df.explain("extended")or df.explain(true) |
Everything: • Parsed (Unresolved) Logical Plan • Analyzed (Resolved) Logical Plan • Optimized Logical Plan • Physical Plan |
Full end-to-end debugging; understand Catalyst transformations. |
| Codegen | df.explain("codegen") |
Physical plan + the actual generated Java source code for WholeStage CodeGen stages. | Investigate codegen problems or performance of fused operators. |
| Cost | df.explain("cost") |
Optimized logical plan with cost estimates from Cost-Based Optimizer (CBO). Only appears if CBO is enabled. |
Understand why Spark chose one plan over another when spark.sql.cbo.enabled=true. |
| Formatted | df.explain("formatted") |
Beautifully indented physical plan with: • Node IDs • Split lines between stages • WholeStage CodeGen asterisks clearly marked |
Best for screenshots, presentations, and deep physical plan analysis. |
Spark Catalyst Optimization Stages
| Stage | Description | Key Rules / Features | Visible In |
|---|---|---|---|
| Unresolved Logical Plan (Parsed) |
Tree built directly from your DataFrame/SQL code. Column names are still strings. | Created by parser / DataFrame transformations | extended explain (top section) |
| Analyzed Logical Plan (Resolved) |
All attributes and functions are resolved against the catalog. Each column gets a unique exprId. |
ResolveReferences, ResolveFunctions, TypeCoercion, etc. | extended explain |
| Optimized Logical Plan | Catalyst applies dozens of rule-based and cost-based optimizations. | Predicate Pushdown, Projection Pushdown, Constant Folding, Join Reordering, CBO (if enabled), etc. | extended, cost explain |
| Physical Plan | Spark picks concrete physical operators (BroadcastHashJoin, SortMergeJoin, HashAggregate, WholeStageCodegen, AQE, etc.). | Join selection, aggregation strategy, codegen fusing, Adaptive Query Execution | All explain modes (most visible in simple, formatted) |
Stage-by-Stage Deep Dive
1. Unresolved Logical Plan (also called "Parsed Logical Plan")
This is the very first tree Catalyst builds from your DataFrame operations.
It is called "unresolved" because attribute references (column names) are still strings and have no type or scope information.
Catalyst uses the Analyzer later to resolve them.
val df = spark.read.parquet("s3://events/")
df.filter("user_id = 123") // "user_id" is just a string here
.groupBy("country")
.agg(count("*"))
Unresolved Logical Plan (you rarely see this directly, but extended explain shows it at the top):
== Parsed Logical Plan ==
Aggregate [country#45], [country#45, count(1) AS count#100]
+- Filter (user_id#40 = 123) // note: attribute is still unresolved
+- Relation[user_id#40,country#45,...] parquet
This tree is built purely by syntactic transformation of DataFrame operations.
2. Analyzed (Resolved) Logical Plan
The Analyzer runs a set of resolution rules using the Catalog and Analysis rules:
Bind column names to attributes in input relations
Resolve functions (e.g., count, sum)
Type checking and coercion
Subquery resolution
View/CTE handling
Key resolution rules:
ResolveRelations
ResolveReferences
ResolveFunctions
ResolveAliases
ResolveAggFunctions, etc.
After this phase, every attribute has an AttributeReference with a unique exprId, data type, nullability, and metadata.
Output example:
== Analyzed Logical Plan ==
Aggregate [country#45], [country#45, count(1) AS count#100L]
+- Filter (user_id#40L = 123)
+- Relation[user_id#40L LongType,country#45 StringType,...] parquet
Notice user_id#40L – the L suffix means Long, and #40 is a unique expression ID.
3. Optimized Logical Plan (Catalyst Optimization)
This is where Spark's magic happens. Catalyst applies a series of rule batches repeatedly until fix-point.
Major rule batches (in order of execution):
Apache Spark Catalyst Optimizer – Major Rule Batches
| Batch Name | Purpose | Important Rules / Examples |
|---|---|---|
| ConstantFolding | Evaluate constant expressions at compile time | Literal folding, 1 + 1 → 2, true && false → false |
| PredicatePushdown | Push filters as early as possible (especially into data sources) | PushDownPredicates, CombineFilters, Parquet/ORC row-group pruning |
| ProjectionPushdown | Push required columns down to the scan | PushProjectionThroughUnion, column pruning in Parquet scans |
| NullPropagation | Eliminate unnecessary null checks when nulls are impossible | Optimizes isNotNull checks on non-nullable columns |
| BooleanSimplification | Simplify complex boolean logic | a && true → a, De Morgan’s laws, etc. |
| SimplifyCasts | Remove redundant or unnecessary type casts | cast(cast(col as string) as string) → col |
| ColumnPruning | Remove unused columns as early as possible | ColumnPruning rule – huge I/O savings |
| LimitPushDown | Push limit into scans and unions |
LocalLimit → GlobalLimit optimization |
| JoinReordering | Reorder joins to minimize intermediate result size (rule-based) | ReorderJoin (puts smallest relations first) |
| JoinElimination | Remove joins that are proven unnecessary | e.g., joining a table that’s filtered to zero rows |
| CollapseProject | Merge consecutive Project operations |
CollapseProject – reduces operator overhead |
| RemoveNoopOperators | Eliminate identity projections and redundant operators | Remove Project(a, b) → Project(a, b) |
These batches are applied repeatedly until a fixed point is reached.
When spark.sql.cbo.enabled=true, Cost-Based Optimization (CBO) adds additional cost-aware rewrites on top.
Since Spark 2.2+ there is also Cost-Based Optimization (CBO) – optional, enabled by:
SET spark.sql.cbo.enabled=true
SET spark.sql.cbo.joinReorder.enabled=trueCBO adds:
Table/column statistics collection (ANALYZE TABLE ... COMPUTE STATISTICS)
Cardinality estimation
Cost model for different join strategies (Broadcast vs Shuffle Hash vs Sort-Merge)
With CBO enabled, you will see cost numbers in explain(cost=true):
== Optimized Logical Plan ==
Aggregate (cost=450000) ...
+- Join Inner (cost=400000) ...
:- Filter (cost=200000) ...
+- Join Inner (broadcast) (cost=150000) ...
4. Physical Plans
Spark now selects a physical operator for every logical operator. This is done by a sequence of planning strategies (in SparkStrategies.scala):
Spark Logical vs Physical Operators – How Catalyst Chooses the Real Execution
| Logical Operator | Possible Physical Operators (what actually runs) |
|---|---|
| Scan (Relation / Table) |
FileSourceScanExec (row-based)BatchScanExec (columnar batch)ColumnarScanExec (Arrow/Columnar)AqeScanExec (with Adaptive Query Execution)
|
| Filter | FilterExec (always) |
| Project | ProjectExec |
| Aggregate (GROUP BY) |
HashAggregateExec (partial + final)SortAggregateExec (when hash table would be too big)ObjectHashAggregateExec (for complex types / very low cardinality)
|
| Join |
BroadcastHashJoinExec (auto or hint, ≤ autoBroadcastJoinThreshold)ShuffledHashJoinExec (one side fits in memory after shuffle)SortMergeJoinExec (large-large joins, default for equi-joins)CartesianProductExec (non-equi joins)BroadcastNestedLoopJoinExec (non-equi + broadcast)
|
| Sort | SortExec (external sort if needed) |
| Limit |
LocalLimitExec (per partition)GlobalLimitExec (across all partitions)
|
| Union | UnionExec |
| Expand | ExpandExecUsed for GROUP BY with grouping sets, CUBE, ROLLUP |
| Window |
WindowExec (with range/row frame partitioning)Uses special WindowGroupLimitExec in Spark 3.4+ for ranked limits
|
Spark chooses the physical operator during the physical planning phase based on statistics, configuration (e.g., autoBroadcastJoinThreshold), hints, and Adaptive Query Execution (AQE) runtime decisions.
Key physical planning decisions:
Join Selection
If one side ≤ spark.sql.autoBroadcastJoinThreshold (default 10MB) → BroadcastHashJoinExec
Else if both sides have equi-keys and sorting is cheap → SortMergeJoinExec
Else → ShuffledHashJoinExec
Aggregation Strategy
If grouping keys have high cardinality → SortAggregate (partial sort + final sort)
If low cardinality or pre-shuffled → HashAggregate
If using object-based aggregation (maps) → ObjectHashAggregateExec
WholeStage Code Generation (Spark 2.0+)
Multiple physical operators are collapsed into a single Java class using Janino compiler.
Node groups are marked with * in explain() output:
*(1) Filter *(1) ColumnarToRow *(1) FileScan parquet ...Each * group becomes one WholeStageCodegenExec → one Java method with fused loops → huge performance win.
Adaptive Query Execution (AQE) – Spark 3.0+ Enabled by default in Spark 3.2+:
spark.sql.adaptive.enabled=trueDynamically coalesces shuffle partitions
Switches shuffle hash join → broadcast if size misestimated
Optimizes skew joins (skewed partitions split)
You will see AQE plans change at runtime:
== Physical Plan == AdaptiveSparkPlan (final plan after AQE) +- AQEShuffleRead (coalesced to 50 partitions)
How Everything Connects – From Code to Tasks
Let’s walk through a concrete example:
spark.read.parquet("s3://events/")
.filter(col("date") >= "2025-01-01")
.join(spark.read.parquet("s3://users/").hint("broadcast"), "user_id")
.groupBy("country")
.agg(count("*").as("users"))
.write.parquet("s3://output/")
Execution timeline on driver:
df.write() is an action → triggers planning
Catalyst builds Unresolved Logical Plan from the lineage
Analyzer resolves columns, functions → Analyzed Logical Plan
Optimizer applies 30+ rule batches → Optimized Logical Plan (filter pushed into Parquet scan, broadcast hint respected)
Physical planner:
Chooses BroadcastHashJoinExec because of hint
Chooses HashAggregateExec for groupBy
Collapses filter + scan + projection into WholeStageCodegen
DAGScheduler:
Breaks the plan into stages at shuffle boundaries
Stage 1: read users → broadcast exchange
Stage 2: read events → filter → hash join → aggregate → shuffle write
Stage 3: final aggregation (if partial agg was used)
Each stage → tasks (one per partition)
Tasks run on executors, produce shuffle files or final output.
Reading a Real Physical Plan (Formatted Explain)
Example output snippet:
== Physical Plan ==
*(5) HashAggregate(keys=[country#25], functions=[count(1)])
+- Exchange hashpartitioning(country#25, 200), ENSURE_REQUIREMENTS, [plan_id=123]
+- *(4) HashAggregate(keys=[country#25], functions=[partial_count(1)])
+- *(3) BroadcastHashJoin [user_id#20L], [user_id#45L], Inner, BuildRight
:- *(2) Project [user_id#20L, country#25]
: +- *(2) Filter (date#22 >= 2025-01-01)
: +- *(2) ColumnarToRow
: +- FileScan parquet ...
+- BroadcastExchange HashedRelationBroadcast Mode (build key [user_id#45L])
+- *(1) FileScan parquet users[user_id#45L] ...
Interpretation:
* groups = WholeStage CodeGeneration units
Numbers in parentheses = stage + codegen ID
Exchange = shuffle boundary (new stage)
BroadcastExchange = no shuffle, data broadcasted
ColumnarToRow = transition from Arrow/columnar to row-based processing
Key Takeaways:
Spark is lazy → everything is just a logical plan until an action.
Catalyst has four major phases:
Unresolved → Analyzed → Optimized Logical → Physical
Optimization is rule-based + optional cost-based (CBO).
Physical planning decides join types, aggregation strategy, codegen.
WholeStage CodeGen is the #1 performance feature of Spark 2.0+.
AQE (Spark 3+) makes many decisions at runtime.
Understanding exprId, WholeStageCodegen IDs, and Exchange nodes is key to reading plans.
With this knowledge you can now:
Explain why a query is slow by reading the physical plan
Force broadcast joins, control aggregation strategy
Write better Spark SQL that benefits from predicate pushdown
Debug shuffle explosions or codegen failures