Data Quality & Quarantine | Spark Practical Scenarios
← All Scenarios

Data Quality & Anomaly Detection: The Quarantine Pattern.

Protecting downstream BI by routing high-quality data to production and invalid records to a rejected table.

In production, data is often "dirty." We are building a validator that inspects incoming vendor files. If a record has a duplicate order_id, a negative price, or a missing customer_id, it is marked as "Invalid" and sent to a Quarantine Table for manual review. Only "Clean" data reaches the Gold Warehouse.

Below is a typical batch containing valid data, a negative value (anomaly), and a duplicate ID.

Vendor Sales Feed (S3)
| order_id | price | status |
| 1001 | 50.00 | SHIPPED | (Valid)
| 1002 | -5.00 | CANCELLED | (Invalid: Negative Price)
| 1001 | 50.00 | SHIPPED | (Invalid: Duplicate ID)

We use a UDF to perform row-level logic checks. For uniqueness, we use Window Functions to tag duplicates without losing rows initially.

from pyspark.sql.functions import *
from pyspark.sql.types import StringType

# 1. Anomaly Detection UDF
@udf(returnType=StringType())
def validate_record(price, status):
    if price is None or price <= 0:
        return "INVALID_PRICE"
    if status not in ["SHIPPED", "PENDING", "CANCELLED"]:
        return "INVALID_STATUS"
    return "VALID"

# 2. Tagging Duplicates via Windowing
window_spec = Window.partitionBy("order_id").orderBy("order_id")
df_tagged = df_raw.withColumn("row_num", row_number().over(window_spec)) \
    .withColumn("quality_flag", validate_record(col("price"), col("status"))) \
    .withColumn("final_status", 
        when(col("row_num") > 1, "DUPLICATE")
        .otherwise(col("quality_flag"))
    )
    

We split the DataFrame into two logical paths. This ensures the pipeline doesn't crash, but the "bad" data is safely set aside for the Data Ops team.

# Filter Clean Data
df_clean = df_tagged.filter(col("final_status") == "VALID").drop("row_num", "final_status")

# Filter Bad Data (The Quarantine)
df_quarantine = df_tagged.filter(col("final_status") != "VALID") \
    .withColumn("rejection_reason", col("final_status")) \
    .withColumn("rejection_ts", current_timestamp())
    

The warehouse receives pure data, while the quarantine table acts as a log for vendor troubleshooting.

Target: Gold Sales Table (Delta)
| order_id | price | status |
| 1001 | 50.00 | SHIPPED |
Target: Rejected_Records (Delta)
| order_id | rejection_reason | rejection_ts |
| 1002 | INVALID_PRICE | 2026-01-27 |
| 1001 | DUPLICATE | 2026-01-27 |
# Write Clean Data
df_clean.write.format("delta").mode("append").saveAsTable("gold.sales")

# Write Quarantine Data
df_quarantine.write.format("delta").mode("append").saveAsTable("audit.quarantine_logs")

# Log KPI for Monitoring
print(f"Ingested {df_clean.count()} records. Quarantined {df_quarantine.count()} records.")
    
Q: Why use a UDF instead of standard Spark SQL functions for validation? While Spark SQL functions are faster (vectorized), UDFs allow for highly complex, multi-column business logic that is easier to maintain and test when validation rules involve 10+ edge cases or external library checks.
Q: How do you handle the performance bottleneck of UDFs? Try to use Pandas UDFs (Vectorized UDFs) if the logic is mathematical. Alternatively, perform "pre-filtering" using native Spark functions to reduce the number of rows the UDF has to process.
Q: What is the benefit of the Quarantine Pattern over just dropping bad rows? Dropping rows leads to Data Loss. If a vendor sends a file where 50% of the data is bad, the Quarantine table allows you to prove it to them, fix the root cause, and re-process those specific rows later.