Data Quality & Anomaly Detection: The Quarantine Pattern.
Protecting downstream BI by routing high-quality data to production and invalid records to a rejected table.
In production, data is often "dirty." We are building a validator that inspects incoming vendor files. If a record has a duplicate order_id, a negative price, or a missing customer_id, it is marked as "Invalid" and sent to a Quarantine Table for manual review. Only "Clean" data reaches the Gold Warehouse.
1. Sample Input with AnomaliesBelow is a typical batch containing valid data, a negative value (anomaly), and a duplicate ID.
| order_id | price | status |
| 1001 | 50.00 | SHIPPED | (Valid)
| 1002 | -5.00 | CANCELLED | (Invalid: Negative Price)
| 1001 | 50.00 | SHIPPED | (Invalid: Duplicate ID)
We use a UDF to perform row-level logic checks. For uniqueness, we use Window Functions to tag duplicates without losing rows initially.
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
# 1. Anomaly Detection UDF
@udf(returnType=StringType())
def validate_record(price, status):
if price is None or price <= 0:
return "INVALID_PRICE"
if status not in ["SHIPPED", "PENDING", "CANCELLED"]:
return "INVALID_STATUS"
return "VALID"
# 2. Tagging Duplicates via Windowing
window_spec = Window.partitionBy("order_id").orderBy("order_id")
df_tagged = df_raw.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("quality_flag", validate_record(col("price"), col("status"))) \
.withColumn("final_status",
when(col("row_num") > 1, "DUPLICATE")
.otherwise(col("quality_flag"))
)
3. The Split: Clean vs. Quarantine
We split the DataFrame into two logical paths. This ensures the pipeline doesn't crash, but the "bad" data is safely set aside for the Data Ops team.
# Filter Clean Data
df_clean = df_tagged.filter(col("final_status") == "VALID").drop("row_num", "final_status")
# Filter Bad Data (The Quarantine)
df_quarantine = df_tagged.filter(col("final_status") != "VALID") \
.withColumn("rejection_reason", col("final_status")) \
.withColumn("rejection_ts", current_timestamp())
4. Output Examples
The warehouse receives pure data, while the quarantine table acts as a log for vendor troubleshooting.
| order_id | price | status |
| 1001 | 50.00 | SHIPPED |
| order_id | rejection_reason | rejection_ts |
| 1002 | INVALID_PRICE | 2026-01-27 |
| 1001 | DUPLICATE | 2026-01-27 |
# Write Clean Data
df_clean.write.format("delta").mode("append").saveAsTable("gold.sales")
# Write Quarantine Data
df_quarantine.write.format("delta").mode("append").saveAsTable("audit.quarantine_logs")
# Log KPI for Monitoring
print(f"Ingested {df_clean.count()} records. Quarantined {df_quarantine.count()} records.")
Interview Q&A