Kafka + Spark
/tldr: Exactly-once Kafka → Spark → Delta is solved in 2025
Exactly-Once
Checkpointing
Idempotency
Backpressure
2025
2025 LAW
Kafka → Spark → Delta Lake
= The only production streaming pattern
= The only production streaming pattern
The Bulletproof Kafka → Spark Query
kafka_df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-prod:9092")
.option("subscribe", "user_events,orders")
.option("startingOffsets", "earliest") # dev
# .option("startingOffsets", "latest") # prod (or use checkpoint)
.option("failOnDataLoss", "false") # survive topic deletion
.option("kafka.security.protocol", "SASL_SSL") # if needed
.option("maxOffsetsPerTrigger", "100000") # rate limiting / backpressure
.load()
)
parsed = (kafka_df
.select(
col("key").cast("string"),
from_json(col("value").cast("string"), schema).alias("json")
)
.select("key", "json.*", "topic", "partition", "offset", "timestamp")
.withColumn("processing_time", current_timestamp())
)
query = (parsed
.writeStream
.format("delta")
.option("checkpointLocation", "s3a://checkpoints/kafka-events/")
.partitionBy("date", "topic")
.trigger(processingTime="60 seconds")
.outputMode("append")
.start("s3a://delta/events_bronze")
)
query.awaitTermination()
Exactly-Once = Two Things Only
Checkpoint
Kafka offsets stored safely in S3/ADLS/GCS
Idempotent Sink
Delta Lake (or foreachBatch + upsert)
Options That Save Your Job
# Production must-haves
.option("failOnDataLoss", "false") # survive topic recreation
.option("maxOffsetsPerTrigger", "50000") # backpressure
.option("minPartitions", "100") # parallelism
.option("includeHeaders", "true") # if you need headers
# Security (AWS/MSK/Confluent)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "AWS_MSK_IAM")
.option("kafka.sasl.jaas.config", "software.amazon.msk.auth.MSKIAMAwareClientCallbackHandler")
.option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
Backpressure That Actually Works
maxOffsetsPerTrigger
Hard limit per micro-batch
trigger(processingTime="X seconds")
Fixed micro-batch interval
Production Checklist (Never Lose Data)
Checkpoint on reliable storage (S3/ADLS/GCS)
Sink = Delta Lake (or idempotent foreachBatch)
maxOffsetsPerTrigger or trigger() set
failOnDataLoss=false
Monitoring: input rate, processing rate, lag
FINAL ANSWER:
Kafka → Spark → Delta
With checkpoint + idempotent write
Exactly-once. Forever.
No exceptions.