PySpark Joins
/tldr: 90% of performance hell comes from joins
Skew
Broadcast
AQE
Salt
Buckets
2025 Golden Rule
Never let Spark auto-pick your join strategy.
You control it — or it controls you.
You control it — or it controls you.
Join Types → Performance Impact
| Join Type | Shuffle? | Speed | When to Use |
|---|---|---|---|
| Broadcast Hash Join | No | 100× faster | Small table < 500 MB (AQE auto-handles) |
| Sort-Merge Join (default) | Yes | Slowest | Large + large |
| Shuffle Hash Join | Yes | Medium | Legacy (rarely better) |
Broadcast Join = Free 100× Speed
from pyspark.sql.functions import broadcast
# Manual (old way)
df_big.join(broadcast(df_small), "id")
# AQE does this automatically (Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "50MB") # default 10MB
# Just write normal join — AQE handles it
df_big.join(df_small, "id")
AQE auto-broadcasts if table < threshold
No manual hint needed in 2025
Set threshold to 100–500MB on big clusters
Check plan: == BroadcastHashJoin ==
Join Skew → One Executor Dies
Problem
One key (e.g. "UNKNOWN") has 90% data
Result
One executor runs 10× longer → OOM or timeout
Fix
Salt or AQE auto-skew
Skew Fixes That Actually Work
1. AQE Auto-Skew (Just Turn On)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
2. Salting (Manual but Bulletproof)
from pyspark.sql.functions import *
# Add salt to big table
df_big_salted = df_big.withColumn("salt", (rand() * 10).cast("int"))
# Explode small table
df_small_exploded = df_small.select(
col("id"),
explode(array([lit(i) for i in range(10)])).alias("salt")
)
# Join and drop salt
result = df_big_salted.join(df_small_exploded, ["id", "salt"]) \
.drop("salt")
3. Bucketed Joins (Delta Lake)
# Write once with bucketing
df.write.format("delta") \
.bucketBy(64, "user_id") \
.sortBy("event_date") \
.saveAsTable("events_bucketed")
# Zero shuffle on join!
spark.sql("""
SELECT * FROM events_bucketed e
JOIN users_bucketed u ON e.user_id = u.user_id
""")
Pro Tips (Save Millions)
Filter before join → less data = faster
Repartition(join keys) only if skewed
Use same partition count on both sides
Check Spark UI → “Skewed Tasks” tab
Use df.explain() → look for SortMergeJoin
AQE + Delta Lake = you win forever
Final Answer:
AQE ON + Filter Early + Broadcast Small Tables
= 99% of join problems solved