Orchestrated Batch ETL | Spark Practical Scenarios
← All Scenarios

Orchestrating Batch ETL with Incremental Loads.

Building a scalable, production-ready pipeline with dependency management and data quality gates.

In a production environment, you never process "all data" every time. We need a pipeline that identifies newly landed files in S3, joins them with user dimensions, performs a KPI calculation, and updates the final Data Mart. If the data quality tests fail, the pipeline must halt before polluting the warehouse.

We use a high-watermark pattern. We check the latest processed timestamp in our Target table and only read files from S3 that arrived after that date.

Source S3 (JSON Logs)
/events/2026-01-25/file_a.json
/events/2026-01-26/file_b.json (New Data)
from pyspark.sql.functions import *

# Identify High Watermark
last_processed_ts = spark.sql("SELECT max(event_ts) FROM gold.sales_mart").collect()[0][0]

# Incremental Read (Predicate Pushdown)
df_incremental = spark.read.json("s3a://raw-logs/events/") \
    .filter(col("arrival_ts") > last_processed_ts)
    

We encapsulate transformations into functions. Before writing, we run a Quality Gate to check for null IDs or negative amounts.

def transform_sales(df, user_dim):
    return df.join(user_dim, "user_id") \
             .withColumn("total_price", col("qty") * col("unit_price"))

def run_quality_checks(df):
    null_count = df.filter(col("order_id").isNull()).count()
    if null_count > 0:
        raise ValueError(f"Data Quality Failed: Found {null_count} null order_ids")

# Execution
user_dim = spark.read.table("silver.users")
df_transformed = transform_sales(df_incremental, user_dim)
run_quality_checks(df_transformed)
    

To ensure the warehouse stays fast, we use partitionBy. We also use merge (if using Delta) or dynamic partition overwrite to update the specific dates processed.

Output: Partitioned Parquet
/warehouse/sales_mart/dt=2026-01-26/part-001.parquet
# Dynamic Partition Overwrite
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

df_transformed.write \
    .mode("overwrite") \
    .partitionBy("dt") \
    .parquet("s3a://warehouse/gold/sales_mart/")
    
Q: Why use Incremental Loads instead of Full Refreshes? Full refreshes become prohibitively expensive and slow as data grows. Incremental loads reduce the "compute window," leading to lower costs and faster data availability (lower latency).
Q: What is the "High Watermark" pattern? It is a technique where the pipeline queries the metadata or the last record of the target table to find the maximum timestamp, ensuring only records newer than that are fetched in the next run.
Q: How do you handle schema evolution in these pipelines? Ideally, use Delta Lake with .option("mergeSchema", "true"). If using pure Parquet, you must manually coordinate schema changes or use a "Silver" layer to normalize schemas before they hit the "Gold" marts.