Apache Spark Structured Streaming Sources & Sinks: Input/Output Mastery (2025 Guide)

apache spark logo

In Structured Streaming, the magic doesn't start until you connect your pipeline to the outside world. Sources are the eyes and ears of your streaming job — they pull in live data from Kafka topics, new files dropping into S3 buckets, TCP sockets, or even synthetic test streams. Sinks are the mouth — they push your transformed results out to Delta Lake tables, Kafka for downstream consumers, consoles for debugging, or custom functions for alerting.

Unlike older APIs, everything here is declarative: you specify the source/sink with options, and Spark handles the incremental reads/writes, checkpointing progress, and fault recovery. No manual offset management, no worrying about partial commits. And since Spark 3.0+, the list has exploded — from Kinesis and Pulsar to Rockset and Azure Event Hubs, plus native Delta Live Tables integration.

We'll walk through the most battle-tested ones with code you can copy-paste today. By the end, you'll know exactly how to wire up production-grade inputs (e.g., Kafka at 1M events/sec) and outputs (e.g., exactly-once writes to Delta without duplicates). Let's dive in.

Input Sources: Where Your Stream Begins

Sources define what feeds your pipeline and how fast. Spark auto-discovers new data (e.g., polling directories for files) and tracks progress via checkpoints, so restarts pick up exactly where you left off. Most support schema enforcement (no surprises from malformed JSON), and you can tune throughput with options like maxFilesPerTrigger or maxOffsetsPerTrigger.

1. File Sources (The Workhorse for Logs & ETL)

Perfect for "directory tailing" — drop JSON/CSV/Parquet files into a folder (S3, HDFS, local), and Spark processes them as they land. It ignores already-seen files via checkpoints. Great for low-latency ETL from app logs or IoT batches.

Key Options:

  • path: The directory to watch

  • maxFilesPerTrigger: Limit files per micro-batch (e.g., 1 for demos, 1000+ for prod)

  • latestFirst: Process newest files first (default: false)

  • fileNameOnly: Use just filenames for dedup (faster on S3)

Python Example (processing new JSON activity logs):

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("FileStream").getOrCreate()

# Streaming read from directory
streaming_df = (spark
    .readStream
    .schema("gt STRING, x DOUBLE, y DOUBLE, z DOUBLE, Arrival_Time LONG")  # Enforce schema!
    .option("maxFilesPerTrigger", 5)  # Tune for your rate
    .json("/path/to/logs/"))  # Watches for new .json files

# Quick aggregation
counts = streaming_df.groupBy("gt").count()

# Output to console for now
query = (counts
    .writeStream
    .queryName("file_activity")
    .format("console")
    .outputMode("complete")
    .start())

query.awaitTermination()  # Production: run forever

Pro Tip: For S3, enable spark.hadoop.fs.s3a.impl for better perf. Since Spark 3.2+, it supports recursive directory scanning and glob patterns (e.g., /logs/2025/*/events-*.json).

2. Kafka Source (The Enterprise Standard)

Kafka is king for high-throughput, durable messaging. Structured Streaming integrates natively with Kafka 0.10+ (up to 3.8+ in Spark 3.5). It handles consumer groups, offsets, and exactly-once via WAL (write-ahead logs). Supports JSON, Avro, Protobuf via schema registry.

Key Options:

  • kafka.bootstrap.servers: Comma-separated brokers (e.g., "broker1:9092,broker2:9092")

  • subscribe: Topic names (e.g., "events" or "topic1,topic2")

  • startingOffsets: "earliest", "latest", or JSON offsets file

  • maxOffsetsPerTrigger: Cap messages per batch (e.g., 10000 to avoid OOM)

Python Example (reading user clicks from Kafka):

from pyspark.sql.avro.functions import from_avro  # If using Avro

kafka_df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "clicks")
    .option("startingOffsets", "latest")  # Or "earliest" for backfill
    .load())

# Parse value column (assuming JSON)
clicks = (kafka_df
    .select(from_json(col("value").cast("string"), 
                      schema="user_id STRING, page STRING, timestamp LONG").alias("data"))
    .select("data.*"))

# Live count per page
page_counts = clicks.groupBy("page").count()

query = (page_counts
    .writeStream
    .format("console")
    .outputMode("complete")
    .start())

Updates in 2025: Spark 3.5+ adds Kafka exactly-once with transactions (via isolation.level=read_committed). For schema evolution, pair with Confluent Schema Registry. Limitation: No dynamic topic creation — subscribe upfront.

3. Socket Source (Debugging & Testing Gold)

Listens to a TCP socket for line-delimited text. Ideal for local testing: pipe nc -lk 9999 and type messages to simulate a stream.

Key Options:

  • host: Server IP (default: localhost)

  • port: Listening port

Python Example:

socket_df = (spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())

# Split lines into words
words = (socket_df
    .select(explode(split(col("value"), " ")).alias("word"))
    .groupBy("word").count())

query = (words
    .writeStream
    .format("console")
    .outputMode("complete")
    .start())

Pro Tip: Use with telnet localhost 9999 or Netcat to send lines like "hello world spark". Not for prod — throughput caps at ~1K lines/sec.

4. Rate Source (Synthetic Load Testing)

Generates fake rows at a fixed rate. Perfect for benchmarking or demos without real data.

Key Options:

  • rowsPerSecond: Events/sec (default: 1)

  • numPartitions: Parallelism (default: spark.default.parallelism)

  • rampUpTime: Seconds to reach full rate

Python Example:

rate_df = (spark.readStream.format("rate").option("rowsPerSecond", 10).load())
query = (rate_df.writeStream.format("console").start())

Other Sources (The Expanding Ecosystem)

  • Kinesis (AWS):format("kinesis") with awsRegion, streamName. Spark 3.3+ supports enhanced fan-out.

  • Pulsar:format("pulsar") — great alternative to Kafka.

  • Event Hubs (Azure): Official connector via format("eventhubs").

  • JDBC: Polls databases for changes (use Debezium for CDC).

  • Custom: Implement Source interface for anything else.

From Databricks docs (2025): Delta Live Tables auto-uses these for unified batch+stream pipelines.

Output Sinks: Where Your Insights Land

Sinks handle writing results — with modes (append/update/complete) controlling what gets written. Spark ensures idempotency and transactions where possible. Checkpoints track the "high watermark" so failures don't duplicate outputs.

1. File Sink (Simple, Scalable Storage)

Writes to a directory in Parquet/JSON/CSV. Append mode adds partitions; complete rewrites the whole thing. Use Delta for ACID.

Key Options:

  • path: Output directory

  • checkpointLocation: Required for fault-tolerance (separate dir)

Python Example (appending aggregated stats):

query = (page_counts
    .writeStream
    .format("parquet")
    .option("path", "/output/stats")
    .option("checkpointLocation", "/checkpoints/stats")
    .outputMode("append")  # Or "complete" for full rewrite
    .trigger(processingTime="10 seconds")  # Batch every 10s
    .start())

Pro Tip: For S3, use Delta (format("delta")) to avoid small-file problems. Spark 3.4+ optimizes with V2 write paths.

2. Kafka Sink (Downstream Pub/Sub)

Publishes back to Kafka with exactly-once (Spark 3.0+). Supports key-value serialization.

Key Options:

  • kafka.bootstrap.servers: Brokers

  • topic: Output topic

  • keyColumn: For partitioning (optional)

Python Example:

query = (page_counts
    .selectExpr("CAST(page AS STRING) AS key", "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "aggregated_clicks")
    .outputMode("update")  # Only changed counts
    .start())

3. Console Sink (Your Best Friend for Dev)

Prints results to driver logs. Append for events, complete for tables.

Python Example:

query = (page_counts
    .writeStream
    .format("console")
    .outputMode("complete")
    .option("numRows", 20)  # Limit output
    .start())

3. Console Sink (Your Best Friend for Dev)

Prints results to driver logs. Append for events, complete for tables.

Python Example:

query = (page_counts
    .writeStream
    .format("console")
    .outputMode("complete")
    .option("numRows", 20)  # Limit output
    .start())

4. Memory Sink (In-Memory Tables for Queries)

Creates a temp view you can SQL-query live. Great for notebooks.

Python Example:

query = (page_counts
    .writeStream
    .format("memory")
    .queryName("live_counts")
    .outputMode("complete")
    .start())

# Now query it!
spark.sql("SELECT * FROM live_counts").show()

5. Foreach/ForeachBatch Sink (Custom Everything)

  • foreach: Per-row function (e.g., send HTTP alert).

  • foreachBatch: Per-micro-batch (e.g., batch upsert to DB).

Python Example (Batch write to JDBC):

def upsert_to_db(batch_df, batch_id):
    batch_df.write \
        .jdbc("jdbc:postgresql://host/db", "live_table",
              mode="append", properties={"user": "user", "password": "pass"})

query = (page_counts
    .writeStream
    .foreachBatch(upsert_to_db)
    .outputMode("update")
    .start())

Updates: Spark 3.5+ adds foreachBatch support for Delta merges (ACID upserts).

Other Sinks

  • Delta Sink:format("delta") — transactional, time-travel, schema enforcement.

  • JDBC: Via foreachBatch for databases like Postgres/MySQL.

  • Custom: Implement Sink for Elasticsearch, Redis, etc.

From Confluent (2025 blog): Kafka + Spark for end-to-end CDC pipelines.

Best Practices: Production-Ready I/O

  • Checkpoint Always: Use a reliable store (S3/DBFS) for checkpointLocation — it's your single point of recovery.

  • Schema First: Never infer — extract from a sample and enforce it.

  • Tune Throughput: Start with maxFilesPerTrigger=1 or maxOffsetsPerTrigger=1000, monitor, scale up.

  • Triggers Matter:.trigger(processingTime='5 seconds') for balanced latency/file size.

  • Monitoring: Use Spark UI's Streaming tab for lag, batches/sec. Integrate with Prometheus/Grafana.

  • Testing: Rate + Socket for unit tests; replay Kafka with old offsets for integration.

  • Scaling: Auto-scale with Delta Live Tables or Kubernetes; watch shuffle partitions for aggregations.

Common Pitfalls: Forgetting checkpoints (loses state on restart), over-subscribing Kafka (OOM), small files in file sinks (use compaction).

Conclusion: Sources + Sinks = End-to-End Real-Time Power

With these building blocks, Structured Streaming isn't just a library — it's a full I/O toolkit for the messy, high-volume world of 2025 data. Kafka for ingestion? File for resilient logs? Delta for outputs that play nice with BI tools? You've got it all, with zero boilerplate.

The beauty: your code stays clean and unified. One pipeline handles batch backfills (static read) and live streams (streaming read). Add ML predictions? Join with static user data? Output to multiple sinks? All declarative, all fault-tolerant.

Companies like Netflix (event sourcing), Uber (ride matching), and DoorDash (order alerts) run millions of events/sec on this exact setup. Now it's your turn: grab a Kafka topic or S3 bucket, paste in the code, and watch data flow. Real-time isn't futuristic — it's just the right source, the right sink, and Spark doing the heavy lifting.