What is Change Data Capture?

Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database. Instead of querying entire tables, CDC tracks individual INSERT, UPDATE, and DELETE operations, enabling real-time data streaming.

CDC is essential for building real-time data pipelines, synchronizing databases, and implementing event-driven architectures.

Why Use CDC?

  • Real-time sync: Keep data warehouses up-to-date without batch jobs
  • Low latency: Sub-second data propagation
  • Reduced load: Only transfer changes, not full tables
  • Event-driven: Enable reactive architectures
  • Audit trail: Complete history of all changes
  • Decoupling: Source database unaware of consumers

CDC Approaches

┌─────────────────────────────────────────────────────────────────┐
│                      CDC Approaches                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. Log-Based CDC (Recommended)                                  │
│     • Reads database transaction logs (WAL, binlog)              │
│     • No impact on source database performance                   │
│     • Captures all changes including deletes                     │
│     • Tools: Debezium, AWS DMS, Fivetran                        │
│                                                                  │
│  2. Trigger-Based CDC                                            │
│     • Database triggers write changes to CDC table               │
│     • Simple to implement                                        │
│     • Performance overhead on source database                    │
│                                                                  │
│  3. Timestamp-Based CDC                                          │
│     • Query records modified since last sync                     │
│     • Requires updated_at column                                 │
│     • Cannot capture deletes                                     │
│     • Simple but not true real-time                              │
│                                                                  │
│  4. Diff-Based CDC                                               │
│     • Compare snapshots to find changes                          │
│     • Resource intensive                                         │
│     • Good for tables without timestamps                         │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Debezium

Debezium is an open-source distributed platform for CDC. It captures row-level changes in databases and streams them to Kafka.

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Database   │────▶│    Debezium     │────▶│    Kafka     │
│  (PostgreSQL)│     │ (Kafka Connect) │     │   Topics     │
└──────────────┘     └─────────────────┘     └──────────────┘
       │                                            │
       │                                            ▼
       │                                   ┌──────────────────┐
       │                                   │    Consumers     │
       └── Reads WAL/binlog                │ (Spark, Flink,   │
           without impacting               │  Applications)   │
           database performance            └──────────────────┘

Setting Up Debezium

# Docker Compose for Debezium setup
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # Required for CDC

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: debezium-connect
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
    ports:
      - "8083:8083"

PostgreSQL CDC Connector

# Create Debezium connector via REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "mydb",
      "database.server.name": "dbserver1",
      "table.include.list": "public.customers,public.orders",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot",
      "publication.name": "dbz_publication",
      "topic.prefix": "dbserver1",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
  }'

# Check connector status
curl http://localhost:8083/connectors/postgres-connector/status

CDC Event Structure

// Debezium change event (Kafka message)
{
  "schema": { ... },
  "payload": {
    "before": {                    // Previous state (null for INSERT)
      "id": 1,
      "name": "Alice",
      "email": "alice@old.com"
    },
    "after": {                     // New state (null for DELETE)
      "id": 1,
      "name": "Alice",
      "email": "alice@new.com"
    },
    "source": {
      "version": "2.4.0",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1703412345678,
      "snapshot": "false",
      "db": "mydb",
      "schema": "public",
      "table": "customers",
      "txId": 12345,
      "lsn": 98765432
    },
    "op": "u",                     // Operation: c=create, u=update, d=delete
    "ts_ms": 1703412345680,
    "transaction": null
  }
}

Consuming CDC Events

# Python consumer with kafka-python
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'dbserver1.public.customers',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    operation = event['payload']['op']

    if operation == 'c':  # Create
        new_record = event['payload']['after']
        print(f"INSERT: {new_record}")

    elif operation == 'u':  # Update
        old_record = event['payload']['before']
        new_record = event['payload']['after']
        print(f"UPDATE: {old_record} -> {new_record}")

    elif operation == 'd':  # Delete
        deleted_record = event['payload']['before']
        print(f"DELETE: {deleted_record}")


# PySpark Streaming consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

spark = SparkSession.builder \
    .appName("CDC Consumer") \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.public.customers") \
    .load()

# Parse CDC events
parsed = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.payload.*")

# Process changes
query = parsed.writeStream \
    .foreachBatch(process_batch) \
    .start()

CDC to Data Lake

# Merge CDC changes into Delta Lake
from delta.tables import DeltaTable
from pyspark.sql.functions import col

def upsert_to_delta(batch_df, batch_id):
    # Filter by operation type
    inserts = batch_df.filter(col("op") == "c").select("after.*")
    updates = batch_df.filter(col("op") == "u").select("after.*")
    deletes = batch_df.filter(col("op") == "d").select("before.id")

    delta_table = DeltaTable.forPath(spark, "/delta/customers")

    # Merge inserts and updates
    changes = inserts.union(updates)
    if changes.count() > 0:
        delta_table.alias("target").merge(
            changes.alias("source"),
            "target.id = source.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

    # Process deletes
    if deletes.count() > 0:
        delta_table.delete(
            col("id").isin([row.id for row in deletes.collect()])
        )

# Stream CDC to Delta Lake
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.public.customers") \
    .load()

cdc_stream.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/checkpoints/customers_cdc") \
    .start()

Best Practices

  • Use log-based CDC: Minimal impact on source database
  • Handle schema changes: Plan for column additions/removals
  • Monitor lag: Track replication delay
  • Idempotent consumers: Handle duplicate events gracefully
  • Preserve ordering: Use single partition for strict ordering
  • Initial snapshot: Load existing data before streaming

Master CDC Patterns

Our Data Engineering program covers CDC, real-time streaming, and modern data architectures.

Explore Data Engineering Program

Related Articles