Telemetry Data Pipeline | Spark Practical Scenarios
← All Scenarios

Processing Multi-Folder IoT Telemetry.

Integrating asynchronous sensor metrics and device events into a unified Delta Lake architecture.

An IoT platform emits data into two distinct S3 hierarchies: raw/metrics (sensor readings) and raw/events (system logs). The metrics contain high-precision nested JSON, while events are flat strings. Because sensors and logs record at slightly different times, we must implement a Time-Windowed Join to associate a sensor's temperature with the device's state at that moment.

Metrics (JSON Payload)
device_id: DEV_01, payload: {"temp": 23.55, "status": {"error": "none", "ts": "2026-01-27T21:30:05Z"}}
Events (String Logs)
device_id: DEV_01, event_type: "REBOOT", log_ts: "2026-01-27T21:30:10Z"

We define a strict schema for the metrics to handle Decimal(10,2) types, ensuring sensor precision is preserved without floating-point errors.

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define Schema for Nested Metrics
metric_schema = StructType([
    StructField("temp", DecimalType(10,2)),
    StructField("status", StructType([
        StructField("error", StringType()),
        StructField("ts", TimestampType())
    ]))
])

# Process Metrics
df_metrics = spark.read.parquet("s3://bucket/raw/metrics/*") \
    .withColumn("data", from_json(col("payload"), metric_schema)) \
    .select("device_id", 
            col("data.temp").alias("temperature"),
            col("data.status.error").alias("error_code"),
            col("data.status.ts").alias("metric_ts")) \
    .na.fill({"temperature": 0.0, "error_code": "unknown"})
    

Event logs often have inconsistent timestamp formats. We use regexp_replace to clean the strings before casting to the standard Timestamp type.

# Process Events
df_events = spark.read.parquet("s3://bucket/raw/events/*") \
    .withColumn("event_ts", to_timestamp(col("log_ts"))) \
    .filter(col("device_id").isNotNull()) \
    .select("device_id", "event_type", "event_ts")
    

In IoT, metrics and events rarely have identical timestamps. We join on device_id and apply a 60-second window constraint to find the closest event for every metric reading.

# Perform Time-Windowed Left Join
df_integrated = df_metrics.join(
    df_events,
    (df_metrics.device_id == df_events.device_id) & 
    (abs(unix_timestamp(df_metrics.metric_ts) - unix_timestamp(df_events.event_ts)) < 60),
    "left"
).drop(df_events.device_id)
    

First, we dump "Processed" Parquet to S3 for long-term storage compatibility. Then, we use MERGE INTO in Unity Catalog to ensure we don't insert duplicate telemetry rows if a job is retried.

# Intermediate Write (S3)
df_integrated.write.mode("overwrite").parquet("s3://bucket/processed/telemetry_cleaned/")

# Final Upsert into Delta Lake (Unity Catalog)
# Assuming 'telemetry_final' is a Delta table partitioned by date
spark.read.parquet("s3://bucket/processed/telemetry_cleaned/") \
    .createOrReplaceTempView("source_view")

spark.sql("""
    MERGE INTO main.iot_catalog.telemetry_final AS target
    USING source_view AS source
    ON target.device_id = source.device_id AND target.metric_ts = source.metric_ts
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")
    
Q: Why use DecimalType(10,2) instead of DoubleType for sensor readings? DoubleType is a floating-point number which can lead to precision loss during aggregations (e.g., 23.55 becomes 23.549999). DecimalType is fixed-point and is mandatory for financial and scientific telemetry where accuracy is non-negotiable.
Q: How does Unity Catalog improve this pipeline? Unity Catalog provides a centralized governance layer. It allows us to apply Column-level security (e.g., masking device_id for certain users) and provides Data Lineage, so we can trace exactly which S3 Parquet file generated a specific Delta Lake record.
Q: What is the performance impact of the abs(timestamp_diff) join? This is a Non-Equi Join. In standard Spark, this can cause a Cartesian Product. However, because we include device_id in the join condition, Spark can still use a SortMergeJoin or BroadcastJoin, drastically limiting the search space.