SCD Type 2 Implementation | Spark Practical Scenarios
← All Scenarios

Implementing SCD Type 2: Handling CDC Data Feeds.

Building a historical record pipeline to track state changes in dimensions over time.

Imagine a customer changes their subscription from "Free" to "Premium." In a standard update, you lose the "Free" record. In SCD Type 2, we expire the old row and insert a new one. We will take a daily CDC (Change Data Capture) file from S3 and merge it into our master table using a Full Outer Join pattern (compatible with standard Parquet).

We start with a Master table and a new Batch of updates for the day.

Master Table (Current State)
| user_id | tier | start_date | end_date | is_current |
| 505 | Free | 2023-01-01 | null | true |
Daily CDC Feed (S3 Updates)
| user_id | tier | updated_at |
| 505 | Premium | 2024-01-26 |

To perform SCD Type 2 in Spark, we join the Master and CDC data. We identify which rows are New, which are Expired, and which remain Unchanged.

from pyspark.sql.functions import *
from pyspark.sql.window import Window

# 1. Load Data
master_df = spark.read.parquet("s3a://warehouse/customers/")
cdc_df = spark.read.parquet("s3a://landing/cdc/date=2024-01-26/")

# 2. Join Master and CDC
# We only join with 'is_current' records of Master
joined_df = master_df.filter(col("is_current") == True).alias("m") \
    .join(cdc_df.alias("c"), col("m.user_id") == col("c.user_id"), "full_outer")

# 3. Handle Three Logical Cases:
# Case A: Row is expired (Exists in both, CDC has new data)
# Case B: Row is new (Exists only in CDC)
# Case C: Row is unchanged (Exists only in Master)
    

We use unionByName to combine the records that need to be closed (end_date set) and the new records that need to be opened.

# Identify Records to "Close" (End-dating the old Master row)
expired_records = joined_df.filter(col("c.user_id").isNotNull() & col("m.user_id").isNotNull()) \
    .select("m.*") \
    .withColumn("end_date", col("c.updated_at")) \
    .withColumn("is_current", lit(False))

# Identify "New" Records (The new state from CDC)
new_records = cdc_df.select(
    col("user_id"), 
    col("tier"), 
    col("updated_at").alias("start_date"),
    lit(None).cast("string").alias("end_date"),
    lit(True).alias("is_current")
)

# Identify "Existing History" (Previous versions already closed)
history_df = master_df.filter(col("is_current") == False)

# Final Union
final_master_df = history_df.unionByName(expired_records).unionByName(new_records)
    
Final Master Table
| user_id | tier | start_date | end_date | is_current |
| 505 | Free | 2023-01-01 | 2024-01-26 | false | (Expired)
| 505 | Premium | 2024-01-26 | null | true | (Active)

In a production environment, we save this back to S3 using partitionBy on the is_current flag or start_date to optimize downstream queries.

final_master_df.write \
    .mode("overwrite") \
    .partitionBy("is_current") \
    .parquet("s3a://warehouse/customers_v2/")

# Optional: Log the count of changes for monitoring
change_count = expired_records.count()
print(f"Processed {change_count} updates today.")
    
Q: Why use Full Outer Join instead of a simple Left Join? A Full Outer Join allows us to capture records that exist only in the CDC feed (New Users) AND records that exist only in the Master (Existing users with no changes) in a single pass.
Q: How do you handle multiple updates for the same user in one CDC batch? You must use a Window Function (row_number().orderBy(desc("updated_at"))) on the CDC dataframe before the join to ensure you are only processing the latest state for each user.