Spark Execution Plan
/tldr: How Catalyst optimizes your code into parallel jobs.
THE CORE IDEA: The Multi-Stage Blueprint
A Spark Execution Plan is the complete workflow, determined by the Catalyst Optimizer, that converts your DataFrame operations (like filter, join, groupBy) into a set of efficient, low-level physical tasks distributed across the cluster.
The Optimization Flow:
1. The Three Layers of the Plan
1. Logical Plan (What to do)
The abstract, tree-based representation of your code. It shows the requested operations (e.g., "Join these two tables") without considering cluster efficiency or execution details.
2. Optimized Logical Plan (Better What)
Generated by the **Catalyst Optimizer**. It applies rules like **predicate pushdown** (filtering early) and **column pruning** to reduce data volume before the physical execution begins.
3. Physical Plan (How to do it)
The final, executable plan. It specifies the actual Spark RDD operators (Map, Filter, Shuffle) and determines how the workload will be broken down into **Stages and Tasks** for parallel execution.
2. Execution Components (DAG & Shuffle)
DAG (Directed Acyclic Graph)
The Physical Plan is converted into a DAG of RDD operations. The DAG scheduler partitions the plan into Stages based on dependencies.
Stages:
A Stage is created when an operation requires data to be physically moved or exchanged across the cluster—this is known as a **Shuffle boundary**.
Wide vs. Narrow Transformations
This determines if data must be shuffled, which is the key to performance.
Narrow (No Shuffle)
Input partitions only contribute to a single output partition (e.g., filter, map). These operations are fast and run in a single Stage.
Wide (Shuffle Required)
Input partitions contribute to multiple output partitions, requiring data movement across the network (e.g., groupByKey, join, repartition). **Triggers a new Stage** and involves disk I/O, making it the primary performance bottleneck.
Optimization Goal: Minimize Shuffle
The Catalyst Optimizer's main job is to minimize or optimize wide transformations, often by using efficient techniques like **Broadcast Hash Joins** (where a small table is sent to all executors to avoid shuffling the large table).
3. Viewing the Plan (EXPLAIN)
You can inspect the execution plan for any DataFrame or SQL query using the explain() method or command. This is critical for debugging performance issues.
-- For DataFrame:
my_dataframe.filter(F.col("price") > 100).explain(True)
-- For SQL:
EXPLAIN EXTENDED
SELECT id, sum(amount) FROM sales GROUP BY id;
SortMergeJoin (often slow) or BroadcastHashJoin (often fast). Also, check for Exchange, which indicates a Shuffle boundary.
The fastest operations are the ones you avoid.