What is Spark Structured Streaming?

Structured Streaming is a scalable, fault-tolerant stream processing engine built on Spark SQL. It allows you to express streaming computations the same way as batch computations on static data.

Spark handles running the stream incrementally and continuously, updating the result as streaming data continues to arrive.

Key Concepts

┌─────────────────────────────────────────────────────────────────┐
│              Structured Streaming Concepts                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Input Sources                                                   │
│  ├── Kafka           (most common for production)               │
│  ├── Files           (JSON, CSV, Parquet, ORC)                  │
│  ├── Socket          (testing only)                             │
│  └── Rate Source     (testing/benchmarking)                     │
│                                                                  │
│  Output Sinks                                                    │
│  ├── Kafka           (streaming to Kafka)                       │
│  ├── Files           (Parquet, JSON, etc.)                      │
│  ├── Delta Lake      (ACID streaming)                           │
│  ├── Console         (debugging)                                │
│  ├── Memory          (testing)                                  │
│  └── foreach/Batch   (custom sinks)                             │
│                                                                  │
│  Output Modes                                                    │
│  ├── Append          (new rows only)                            │
│  ├── Complete        (all rows, for aggregations)               │
│  └── Update          (only changed rows)                        │
│                                                                  │
│  Triggers                                                        │
│  ├── Default         (process ASAP)                             │
│  ├── Fixed interval  (every X seconds/minutes)                  │
│  ├── Once            (one batch, then stop)                     │
│  └── Available-now   (process all available, then stop)         │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Basic Streaming Query

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamingExample") \
    .getOrCreate()

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka returns key, value, topic, partition, offset, timestamp
# Value is binary - need to cast and parse
events = df.select(
    col("key").cast("string"),
    from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select("key", "data.*", "kafka_timestamp")

# Transform
processed = events \
    .filter(col("event_type") == "purchase") \
    .withColumn("processed_at", current_timestamp())

# Write to Delta Lake
query = processed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/purchases") \
    .start("/delta/purchases")

# Wait for termination
query.awaitTermination()

Windowed Aggregations

from pyspark.sql.functions import window, col, count, sum, avg

# Tumbling window (non-overlapping)
# Count events per 5-minute window
tumbling = events.groupBy(
    window(col("event_time"), "5 minutes"),
    col("event_type")
).agg(
    count("*").alias("event_count"),
    sum("amount").alias("total_amount")
)

# Sliding window (overlapping)
# 10-minute windows, sliding every 5 minutes
sliding = events.groupBy(
    window(col("event_time"), "10 minutes", "5 minutes"),
    col("product_id")
).agg(
    count("*").alias("count"),
    avg("amount").alias("avg_amount")
)

# Session window (gap-based)
# Sessions with 30-minute inactivity gap
session = events.groupBy(
    session_window(col("event_time"), "30 minutes"),
    col("user_id")
).agg(
    count("*").alias("events_in_session"),
    min("event_time").alias("session_start"),
    max("event_time").alias("session_end")
)

# Write windowed aggregations
query = tumbling.writeStream \
    .format("delta") \
    .outputMode("update")  # Update mode for aggregations \
    .option("checkpointLocation", "/checkpoints/metrics") \
    .start("/delta/event_metrics")

Watermarks (Late Data Handling)

# Watermarks define how late data can arrive
# Data older than watermark is dropped

# Allow data up to 1 hour late
events_with_watermark = events \
    .withWatermark("event_time", "1 hour")

# Aggregate with watermark
aggregated = events_with_watermark.groupBy(
    window(col("event_time"), "10 minutes"),
    col("product_id")
).agg(
    count("*").alias("count")
)

# Without watermark: State grows indefinitely
# With watermark: Old state is cleaned up

# Visualization of watermark behavior:
"""
Event Time:  12:00  12:10  12:20  12:30  12:40
Watermark:   11:00  11:10  11:20  11:30  11:40  (1 hour behind)

At 12:40:
- Events with event_time > 11:40 are processed
- Events with event_time < 11:40 are dropped (too late)
- State for windows ending before 11:40 is cleaned up
"""

Stream-Stream Joins

# Join two streams (e.g., clicks and impressions)
clicks = spark.readStream \
    .format("kafka") \
    .option("subscribe", "clicks") \
    .load() \
    .select(
        col("value.user_id"),
        col("value.ad_id"),
        col("value.click_time").alias("click_time")
    ).withWatermark("click_time", "10 minutes")

impressions = spark.readStream \
    .format("kafka") \
    .option("subscribe", "impressions") \
    .load() \
    .select(
        col("value.user_id"),
        col("value.ad_id"),
        col("value.impression_time").alias("impression_time")
    ).withWatermark("impression_time", "10 minutes")

# Join clicks with impressions
# Click must happen within 5 minutes after impression
joined = clicks.join(
    impressions,
    expr("""
        clicks.user_id = impressions.user_id AND
        clicks.ad_id = impressions.ad_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 5 minutes
    """),
    "inner"
)

# Calculate click-through rate
ctr = joined.groupBy("ad_id").agg(
    count("*").alias("clicks"),
    countDistinct("user_id").alias("unique_users")
)

foreachBatch (Custom Sinks)

# Process each micro-batch with custom logic
def process_batch(batch_df, batch_id):
    """Custom processing for each micro-batch"""
    print(f"Processing batch {batch_id} with {batch_df.count()} rows")

    # Write to multiple destinations
    batch_df.write \
        .format("delta") \
        .mode("append") \
        .save("/delta/events")

    # Update aggregates in a database
    aggregates = batch_df.groupBy("event_type").count()
    aggregates.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost/db") \
        .option("dbtable", "event_counts") \
        .mode("append") \
        .save()

    # Send alerts for specific events
    alerts = batch_df.filter(col("severity") == "critical")
    if alerts.count() > 0:
        send_alerts(alerts.collect())

# Apply foreachBatch
query = events.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/checkpoints/custom") \
    .trigger(processingTime="1 minute") \
    .start()

# Stateful processing with mapGroupsWithState
from pyspark.sql.streaming.state import GroupState

def update_user_state(key, events, state: GroupState):
    """Track user session state"""
    current_state = state.getOption or {"count": 0, "total": 0}
    for event in events:
        current_state["count"] += 1
        current_state["total"] += event.amount

    state.update(current_state)
    return (key, current_state["count"], current_state["total"])

Triggers

from pyspark.sql.streaming import Trigger

# Default (process as fast as possible)
query = df.writeStream \
    .format("delta") \
    .start()

# Fixed interval (every 30 seconds)
query = df.writeStream \
    .format("delta") \
    .trigger(processingTime="30 seconds") \
    .start()

# Once (single batch, useful for testing or backfill)
query = df.writeStream \
    .format("delta") \
    .trigger(once=True) \
    .start()

# Available-now (process all available data, then stop)
query = df.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \
    .start()

# Continuous processing (experimental, sub-millisecond latency)
query = df.writeStream \
    .format("kafka") \
    .trigger(continuous="1 second") \
    .start()

Monitoring & Debugging

# Check query status
query = df.writeStream.format("delta").start()

# Get current status
print(query.status)
# {'message': 'Processing new data', 'isDataAvailable': True, ...}

# Get recent progress
print(query.recentProgress)
# [{'inputRowsPerSecond': 1000, 'processedRowsPerSecond': 5000, ...}]

# Last progress report
progress = query.lastProgress
print(f"Input rate: {progress['inputRowsPerSecond']}")
print(f"Processing rate: {progress['processedRowsPerSecond']}")
print(f"Batch duration: {progress['batchDuration']} ms")

# Stop query gracefully
query.stop()

# List active queries
for q in spark.streams.active:
    print(f"{q.name}: {q.status}")

# Exception handling
try:
    query.awaitTermination()
except StreamingQueryException as e:
    print(f"Query failed: {e}")

Best Practices

  • Always use watermarks: Prevents unbounded state growth
  • Choose right trigger: Balance latency vs throughput
  • Checkpoint to durable storage: Enable recovery from failures
  • Monitor lag: Track processing time vs arrival time
  • Test with batch: Same code works for batch debugging
  • Use Delta Lake: ACID guarantees for streaming sinks

Master Spark Streaming

Our Data Engineering program covers real-time processing with Spark and Kafka.

Explore Data Engineering Program

Related Articles