Implementing SCD Type 2: Handling CDC Data Feeds.
Building a historical record pipeline to track state changes in dimensions over time.
Imagine a customer changes their subscription from "Free" to "Premium." In a standard update, you lose the "Free" record. In SCD Type 2, we expire the old row and insert a new one. We will take a daily CDC (Change Data Capture) file from S3 and merge it into our master table using a Full Outer Join pattern (compatible with standard Parquet).
1. Data ExamplesWe start with a Master table and a new Batch of updates for the day.
| user_id | tier | start_date | end_date | is_current |
| 505 | Free | 2023-01-01 | null | true |
| user_id | tier | updated_at |
| 505 | Premium | 2024-01-26 |
To perform SCD Type 2 in Spark, we join the Master and CDC data. We identify which rows are New, which are Expired, and which remain Unchanged.
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# 1. Load Data
master_df = spark.read.parquet("s3a://warehouse/customers/")
cdc_df = spark.read.parquet("s3a://landing/cdc/date=2024-01-26/")
# 2. Join Master and CDC
# We only join with 'is_current' records of Master
joined_df = master_df.filter(col("is_current") == True).alias("m") \
.join(cdc_df.alias("c"), col("m.user_id") == col("c.user_id"), "full_outer")
# 3. Handle Three Logical Cases:
# Case A: Row is expired (Exists in both, CDC has new data)
# Case B: Row is new (Exists only in CDC)
# Case C: Row is unchanged (Exists only in Master)
3. Expiring and Inserting
We use unionByName to combine the records that need to be closed (end_date set) and the new records that need to be opened.
# Identify Records to "Close" (End-dating the old Master row)
expired_records = joined_df.filter(col("c.user_id").isNotNull() & col("m.user_id").isNotNull()) \
.select("m.*") \
.withColumn("end_date", col("c.updated_at")) \
.withColumn("is_current", lit(False))
# Identify "New" Records (The new state from CDC)
new_records = cdc_df.select(
col("user_id"),
col("tier"),
col("updated_at").alias("start_date"),
lit(None).cast("string").alias("end_date"),
lit(True).alias("is_current")
)
# Identify "Existing History" (Previous versions already closed)
history_df = master_df.filter(col("is_current") == False)
# Final Union
final_master_df = history_df.unionByName(expired_records).unionByName(new_records)
4. Output Table (SCD Type 2 Result)
| user_id | tier | start_date | end_date | is_current |
| 505 | Free | 2023-01-01 | 2024-01-26 | false | (Expired)
| 505 | Premium | 2024-01-26 | null | true | (Active)
In a production environment, we save this back to S3 using partitionBy on the is_current flag or start_date to optimize downstream queries.
final_master_df.write \
.mode("overwrite") \
.partitionBy("is_current") \
.parquet("s3a://warehouse/customers_v2/")
# Optional: Log the count of changes for monitoring
change_count = expired_records.count()
print(f"Processed {change_count} updates today.")
Interview Q&A