What is Change Data Capture?
Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database. Instead of querying entire tables, CDC tracks individual INSERT, UPDATE, and DELETE operations, enabling real-time data streaming.
CDC is essential for building real-time data pipelines, synchronizing databases, and implementing event-driven architectures.
Why Use CDC?
- Real-time sync: Keep data warehouses up-to-date without batch jobs
- Low latency: Sub-second data propagation
- Reduced load: Only transfer changes, not full tables
- Event-driven: Enable reactive architectures
- Audit trail: Complete history of all changes
- Decoupling: Source database unaware of consumers
CDC Approaches
┌─────────────────────────────────────────────────────────────────┐
│ CDC Approaches │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Log-Based CDC (Recommended) │
│ • Reads database transaction logs (WAL, binlog) │
│ • No impact on source database performance │
│ • Captures all changes including deletes │
│ • Tools: Debezium, AWS DMS, Fivetran │
│ │
│ 2. Trigger-Based CDC │
│ • Database triggers write changes to CDC table │
│ • Simple to implement │
│ • Performance overhead on source database │
│ │
│ 3. Timestamp-Based CDC │
│ • Query records modified since last sync │
│ • Requires updated_at column │
│ • Cannot capture deletes │
│ • Simple but not true real-time │
│ │
│ 4. Diff-Based CDC │
│ • Compare snapshots to find changes │
│ • Resource intensive │
│ • Good for tables without timestamps │
│ │
└─────────────────────────────────────────────────────────────────┘
Debezium
Debezium is an open-source distributed platform for CDC. It captures row-level changes in databases and streams them to Kafka.
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Database │────▶│ Debezium │────▶│ Kafka │
│ (PostgreSQL)│ │ (Kafka Connect) │ │ Topics │
└──────────────┘ └─────────────────┘ └──────────────┘
│ │
│ ▼
│ ┌──────────────────┐
│ │ Consumers │
└── Reads WAL/binlog │ (Spark, Flink, │
without impacting │ Applications) │
database performance └──────────────────┘
Setting Up Debezium
# Docker Compose for Debezium setup
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
postgres:
image: postgres:15
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # Required for CDC
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: debezium-connect
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
ports:
- "8083:8083"
PostgreSQL CDC Connector
# Create Debezium connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"database.server.name": "dbserver1",
"table.include.list": "public.customers,public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "dbserver1",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}'
# Check connector status
curl http://localhost:8083/connectors/postgres-connector/status
CDC Event Structure
// Debezium change event (Kafka message)
{
"schema": { ... },
"payload": {
"before": { // Previous state (null for INSERT)
"id": 1,
"name": "Alice",
"email": "alice@old.com"
},
"after": { // New state (null for DELETE)
"id": 1,
"name": "Alice",
"email": "alice@new.com"
},
"source": {
"version": "2.4.0",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1703412345678,
"snapshot": "false",
"db": "mydb",
"schema": "public",
"table": "customers",
"txId": 12345,
"lsn": 98765432
},
"op": "u", // Operation: c=create, u=update, d=delete
"ts_ms": 1703412345680,
"transaction": null
}
}
Consuming CDC Events
# Python consumer with kafka-python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'dbserver1.public.customers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
operation = event['payload']['op']
if operation == 'c': # Create
new_record = event['payload']['after']
print(f"INSERT: {new_record}")
elif operation == 'u': # Update
old_record = event['payload']['before']
new_record = event['payload']['after']
print(f"UPDATE: {old_record} -> {new_record}")
elif operation == 'd': # Delete
deleted_record = event['payload']['before']
print(f"DELETE: {deleted_record}")
# PySpark Streaming consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
spark = SparkSession.builder \
.appName("CDC Consumer") \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dbserver1.public.customers") \
.load()
# Parse CDC events
parsed = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.payload.*")
# Process changes
query = parsed.writeStream \
.foreachBatch(process_batch) \
.start()
CDC to Data Lake
# Merge CDC changes into Delta Lake
from delta.tables import DeltaTable
from pyspark.sql.functions import col
def upsert_to_delta(batch_df, batch_id):
# Filter by operation type
inserts = batch_df.filter(col("op") == "c").select("after.*")
updates = batch_df.filter(col("op") == "u").select("after.*")
deletes = batch_df.filter(col("op") == "d").select("before.id")
delta_table = DeltaTable.forPath(spark, "/delta/customers")
# Merge inserts and updates
changes = inserts.union(updates)
if changes.count() > 0:
delta_table.alias("target").merge(
changes.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Process deletes
if deletes.count() > 0:
delta_table.delete(
col("id").isin([row.id for row in deletes.collect()])
)
# Stream CDC to Delta Lake
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dbserver1.public.customers") \
.load()
cdc_stream.writeStream \
.foreachBatch(upsert_to_delta) \
.option("checkpointLocation", "/checkpoints/customers_cdc") \
.start()
Best Practices
- Use log-based CDC: Minimal impact on source database
- Handle schema changes: Plan for column additions/removals
- Monitor lag: Track replication delay
- Idempotent consumers: Handle duplicate events gracefully
- Preserve ordering: Use single partition for strict ordering
- Initial snapshot: Load existing data before streaming
Master CDC Patterns
Our Data Engineering program covers CDC, real-time streaming, and modern data architectures.
Explore Data Engineering Program