Structured Streaming TL;DR
Back to Apache Spark TL;DR Hub

Structured Streaming

/tldr: Exactly-once, stateful, fault-tolerant — just write batch code

Exactly-Once Stateful Watermark Delta Lake 2025

2025 LAW

Structured Streaming + Delta Lake
= The only production streaming stack

The Golden Streaming Query (2025)

# 1. Read stream
stream_df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")   # or "latest"
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("data"))
    .select("data.*")
    .withColumn("processing_time", current_timestamp())
    .withWatermark("event_time", "10 minutes")
)

# 2. Transform exactly like batch
windowed = (stream_df
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),
        col("user_id")
    )
    .agg(count("*").alias("events"))
)

# 3. Write stream (exactly-once!)
query = (windowed
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoints/events_agg")
    .partitionBy("window.start")
    .outputMode("append")        # or "complete" or "update"
    .trigger(processingTime="2 minutes")
    .start("/delta/events_agg")
)

query.awaitTermination()
            

Output Modes — Never Guess Again

append

Default · Only new rows · Use with watermark

update

Updated rows only · Stateful aggregations

complete

Whole table every time · Small state only!

Watermark = Your Safety Net

// Without watermark → state grows forever → OOM
// With watermark → old state auto-dropped
.withWatermark("event_time", "2 hours")

// Then allow 10 min late data
.dropDuplicates("event_id", "event_time")   # idempotency
            
Rule: Always set watermark = max expected delay + buffer

Exactly-Once Sources & Sinks (2025)

Sources
  • Kafka (with offsets)
  • Delta Lake
  • Kinesis, EventHub
Sinks
  • Delta Lake (gold standard)
  • Kafka
  • foreachBatch() + JDBC

Production Checklist (Never Forget)

Checkpoint location on reliable storage (S3, ADLS, GCS)
Watermark set
Idempotent writes (Delta or foreachBatch + upsert)
trigger(processingTime="X minutes") or continuous
Monitoring: lag, processing rate, state size

FINAL ANSWER:

Structured Streaming + Delta Lake
= Exactly-once streaming that just works

Write batch code.
Get streaming superpowers.

Spark 3.5+ • Delta Lake mandatory in production • Databricks, EMR, GCP, Azure • 2025 standard