Processing Multi-Folder IoT Telemetry.
Integrating asynchronous sensor metrics and device events into a unified Delta Lake architecture.
An IoT platform emits data into two distinct S3 hierarchies: raw/metrics (sensor readings) and raw/events (system logs). The metrics contain high-precision nested JSON, while events are flat strings. Because sensors and logs record at slightly different times, we must implement a Time-Windowed Join to associate a sensor's temperature with the device's state at that moment.
1. Input Data Archetypesdevice_id: DEV_01, payload: {"temp": 23.55, "status": {"error": "none", "ts": "2026-01-27T21:30:05Z"}}
device_id: DEV_01, event_type: "REBOOT", log_ts: "2026-01-27T21:30:10Z"
We define a strict schema for the metrics to handle Decimal(10,2) types, ensuring sensor precision is preserved without floating-point errors.
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Define Schema for Nested Metrics
metric_schema = StructType([
StructField("temp", DecimalType(10,2)),
StructField("status", StructType([
StructField("error", StringType()),
StructField("ts", TimestampType())
]))
])
# Process Metrics
df_metrics = spark.read.parquet("s3://bucket/raw/metrics/*") \
.withColumn("data", from_json(col("payload"), metric_schema)) \
.select("device_id",
col("data.temp").alias("temperature"),
col("data.status.error").alias("error_code"),
col("data.status.ts").alias("metric_ts")) \
.na.fill({"temperature": 0.0, "error_code": "unknown"})
3. Cleaning Events & Normalizing Time
Event logs often have inconsistent timestamp formats. We use regexp_replace to clean the strings before casting to the standard Timestamp type.
# Process Events
df_events = spark.read.parquet("s3://bucket/raw/events/*") \
.withColumn("event_ts", to_timestamp(col("log_ts"))) \
.filter(col("device_id").isNotNull()) \
.select("device_id", "event_type", "event_ts")
4. The Asynchronous Time-Window Join
In IoT, metrics and events rarely have identical timestamps. We join on device_id and apply a 60-second window constraint to find the closest event for every metric reading.
# Perform Time-Windowed Left Join
df_integrated = df_metrics.join(
df_events,
(df_metrics.device_id == df_events.device_id) &
(abs(unix_timestamp(df_metrics.metric_ts) - unix_timestamp(df_events.event_ts)) < 60),
"left"
).drop(df_events.device_id)
5. Writing Intermediate & Final Delta Merge
First, we dump "Processed" Parquet to S3 for long-term storage compatibility. Then, we use MERGE INTO in Unity Catalog to ensure we don't insert duplicate telemetry rows if a job is retried.
# Intermediate Write (S3)
df_integrated.write.mode("overwrite").parquet("s3://bucket/processed/telemetry_cleaned/")
# Final Upsert into Delta Lake (Unity Catalog)
# Assuming 'telemetry_final' is a Delta table partitioned by date
spark.read.parquet("s3://bucket/processed/telemetry_cleaned/") \
.createOrReplaceTempView("source_view")
spark.sql("""
MERGE INTO main.iot_catalog.telemetry_final AS target
USING source_view AS source
ON target.device_id = source.device_id AND target.metric_ts = source.metric_ts
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Interview Q&A