Orchestrating Batch ETL with Incremental Loads.
Building a scalable, production-ready pipeline with dependency management and data quality gates.
In a production environment, you never process "all data" every time. We need a pipeline that identifies newly landed files in S3, joins them with user dimensions, performs a KPI calculation, and updates the final Data Mart. If the data quality tests fail, the pipeline must halt before polluting the warehouse.
1. Ingestion & Incremental DiscoveryWe use a high-watermark pattern. We check the latest processed timestamp in our Target table and only read files from S3 that arrived after that date.
/events/2026-01-25/file_a.json
/events/2026-01-26/file_b.json (New Data)
from pyspark.sql.functions import *
# Identify High Watermark
last_processed_ts = spark.sql("SELECT max(event_ts) FROM gold.sales_mart").collect()[0][0]
# Incremental Read (Predicate Pushdown)
df_incremental = spark.read.json("s3a://raw-logs/events/") \
.filter(col("arrival_ts") > last_processed_ts)
2. Modular Business Logic & Testing
We encapsulate transformations into functions. Before writing, we run a Quality Gate to check for null IDs or negative amounts.
def transform_sales(df, user_dim):
return df.join(user_dim, "user_id") \
.withColumn("total_price", col("qty") * col("unit_price"))
def run_quality_checks(df):
null_count = df.filter(col("order_id").isNull()).count()
if null_count > 0:
raise ValueError(f"Data Quality Failed: Found {null_count} null order_ids")
# Execution
user_dim = spark.read.table("silver.users")
df_transformed = transform_sales(df_incremental, user_dim)
run_quality_checks(df_transformed)
3. Partitioned Load to Data Mart
To ensure the warehouse stays fast, we use partitionBy. We also use merge (if using Delta) or dynamic partition overwrite to update the specific dates processed.
/warehouse/sales_mart/dt=2026-01-26/part-001.parquet
# Dynamic Partition Overwrite
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df_transformed.write \
.mode("overwrite") \
.partitionBy("dt") \
.parquet("s3a://warehouse/gold/sales_mart/")
Interview Q&A