What is Delta Lake?
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and data lakes. It provides reliability, quality, and performance to your data lake, enabling what's called a "lakehouse" architecture.
Originally developed by Databricks, Delta Lake solves common data lake problems like data corruption, failed writes, and inconsistent reads.
Why Delta Lake?
- ACID Transactions: Atomic, consistent, isolated, and durable operations
- Time Travel: Query previous versions of your data
- Schema Evolution: Handle schema changes gracefully
- Unified Batch & Streaming: Same table for both workloads
- Data Versioning: Full audit history of changes
- Scalable Metadata: Handles billions of files
Getting Started
# Install Delta Lake
pip install delta-spark
# Configure Spark with Delta Lake
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create a Delta table
data = [
(1, "Alice", 28, "Engineering"),
(2, "Bob", 35, "Marketing"),
(3, "Charlie", 42, "Sales")
]
df = spark.createDataFrame(data, ["id", "name", "age", "department"])
# Write as Delta table
df.write.format("delta").save("/data/employees")
# Or with SQL
spark.sql("""
CREATE TABLE employees (
id INT,
name STRING,
age INT,
department STRING
) USING DELTA
LOCATION '/data/employees'
""")
CRUD Operations
from delta.tables import DeltaTable
# Read Delta table
df = spark.read.format("delta").load("/data/employees")
# Or
df = spark.table("employees")
# Append new data
new_data = [(4, "Diana", 29, "Engineering")]
spark.createDataFrame(new_data, ["id", "name", "age", "department"]) \
.write.format("delta").mode("append").save("/data/employees")
# Update records
delta_table = DeltaTable.forPath(spark, "/data/employees")
delta_table.update(
condition="id = 1",
set={"age": "29", "department": "'Data Engineering'"}
)
# Delete records
delta_table.delete("age < 30")
# Upsert (Merge) - most powerful operation
new_employees = spark.createDataFrame([
(1, "Alice", 30, "Data Engineering"), # Update existing
(5, "Eve", 26, "Product") # Insert new
], ["id", "name", "age", "department"])
delta_table.alias("target").merge(
new_employees.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"age": "source.age",
"department": "source.department"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"age": "source.age",
"department": "source.department"
}).execute()
Time Travel
# View table history
delta_table = DeltaTable.forPath(spark, "/data/employees")
delta_table.history().show()
# +-------+-------------------+------+--------+
# |version| timestamp|userId|operation|
# +-------+-------------------+------+--------+
# | 3|2024-12-24 10:30:00| null| UPDATE|
# | 2|2024-12-24 10:15:00| null| DELETE|
# | 1|2024-12-24 10:00:00| null| WRITE|
# | 0|2024-12-24 09:45:00| null| WRITE|
# +-------+-------------------+------+--------+
# Read specific version
df_v1 = spark.read.format("delta") \
.option("versionAsOf", 1) \
.load("/data/employees")
# Read at specific timestamp
df_timestamp = spark.read.format("delta") \
.option("timestampAsOf", "2024-12-24 10:00:00") \
.load("/data/employees")
# SQL syntax
spark.sql("SELECT * FROM employees VERSION AS OF 1")
spark.sql("SELECT * FROM employees TIMESTAMP AS OF '2024-12-24 10:00:00'")
# Restore to previous version
delta_table.restoreToVersion(1)
# Or
delta_table.restoreToTimestamp("2024-12-24 10:00:00")
Schema Evolution
# Add new columns automatically
new_data_with_column = spark.createDataFrame([
(6, "Frank", 33, "HR", "frank@company.com")
], ["id", "name", "age", "department", "email"])
# Schema merge during write
new_data_with_column.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/data/employees")
# Enable auto schema evolution globally
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Change column type (with care!)
spark.sql("""
ALTER TABLE employees
ALTER COLUMN age TYPE BIGINT
""")
# Add column with SQL
spark.sql("""
ALTER TABLE employees
ADD COLUMN hire_date DATE
""")
# View schema
delta_table.toDF().printSchema()
Medallion Architecture
Delta Lake enables the popular Bronze-Silver-Gold data organization:
┌─────────────────────────────────────────────────────────────────┐
│ Medallion Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Bronze │───▶│ Silver │───▶│ Gold │ │
│ │ Layer │ │ Layer │ │ Layer │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Raw Data Cleaned & Aggregated & │
│ As-Is Validated Business-Ready │
├─────────────────────────────────────────────────────────────────┤
│ Bronze: Raw ingestion, preserve source format │
│ Silver: Cleaned, deduplicated, standardized │
│ Gold: Aggregated, denormalized for reporting │
└─────────────────────────────────────────────────────────────────┘
# Bronze Layer - Raw ingestion
raw_data = spark.read.json("/raw/events/")
raw_data.write \
.format("delta") \
.mode("append") \
.save("/delta/bronze/events")
# Silver Layer - Cleaned data
bronze_df = spark.read.format("delta").load("/delta/bronze/events")
silver_df = bronze_df \
.dropDuplicates(["event_id"]) \
.filter("event_type IS NOT NULL") \
.withColumn("processed_at", current_timestamp())
silver_df.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/silver/events")
# Gold Layer - Aggregated for reporting
silver_df = spark.read.format("delta").load("/delta/silver/events")
gold_df = silver_df \
.groupBy("date", "event_type") \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users")
)
gold_df.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/gold/event_summary")
Optimization
# Optimize (compact small files)
delta_table.optimize().executeCompaction()
# Optimize with Z-Order (co-locate related data)
delta_table.optimize().executeZOrderBy("date", "customer_id")
# Vacuum (remove old files)
# Default retention is 7 days
delta_table.vacuum() # Use default retention
delta_table.vacuum(168) # 168 hours = 7 days
# Shorter retention (use with caution!)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(0) # Remove all old files immediately
# Auto-optimize settings
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# Analyze table for statistics
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS")
# Check table details
spark.sql("DESCRIBE DETAIL employees").show()
spark.sql("DESCRIBE HISTORY employees").show()
Streaming with Delta
# Delta as streaming source
stream_df = spark.readStream \
.format("delta") \
.load("/delta/silver/events")
# Delta as streaming sink
query = stream_df \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/gold") \
.start("/delta/gold/events")
# Process changes only (CDC)
stream_df = spark.readStream \
.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.load("/delta/silver/events")
# Change data includes _change_type column:
# insert, update_preimage, update_postimage, delete
Best Practices
- Partition wisely: Use date columns, avoid high cardinality
- Optimize regularly: Compact small files to improve read performance
- Use Z-Ordering: For columns frequently used in filters
- Set retention: Configure vacuum retention based on needs
- Enable auto-optimize: For write-heavy workloads
- Monitor table size: Use DESCRIBE DETAIL to track growth
Master Delta Lake
Our Data Engineering program covers Delta Lake, lakehouse architecture, and modern data platforms.
Explore Data Engineering Program