What is Delta Lake?

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and data lakes. It provides reliability, quality, and performance to your data lake, enabling what's called a "lakehouse" architecture.

Originally developed by Databricks, Delta Lake solves common data lake problems like data corruption, failed writes, and inconsistent reads.

Why Delta Lake?

  • ACID Transactions: Atomic, consistent, isolated, and durable operations
  • Time Travel: Query previous versions of your data
  • Schema Evolution: Handle schema changes gracefully
  • Unified Batch & Streaming: Same table for both workloads
  • Data Versioning: Full audit history of changes
  • Scalable Metadata: Handles billions of files

Getting Started

# Install Delta Lake
pip install delta-spark

# Configure Spark with Delta Lake
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create a Delta table
data = [
    (1, "Alice", 28, "Engineering"),
    (2, "Bob", 35, "Marketing"),
    (3, "Charlie", 42, "Sales")
]
df = spark.createDataFrame(data, ["id", "name", "age", "department"])

# Write as Delta table
df.write.format("delta").save("/data/employees")

# Or with SQL
spark.sql("""
    CREATE TABLE employees (
        id INT,
        name STRING,
        age INT,
        department STRING
    ) USING DELTA
    LOCATION '/data/employees'
""")

CRUD Operations

from delta.tables import DeltaTable

# Read Delta table
df = spark.read.format("delta").load("/data/employees")
# Or
df = spark.table("employees")

# Append new data
new_data = [(4, "Diana", 29, "Engineering")]
spark.createDataFrame(new_data, ["id", "name", "age", "department"]) \
    .write.format("delta").mode("append").save("/data/employees")

# Update records
delta_table = DeltaTable.forPath(spark, "/data/employees")

delta_table.update(
    condition="id = 1",
    set={"age": "29", "department": "'Data Engineering'"}
)

# Delete records
delta_table.delete("age < 30")

# Upsert (Merge) - most powerful operation
new_employees = spark.createDataFrame([
    (1, "Alice", 30, "Data Engineering"),  # Update existing
    (5, "Eve", 26, "Product")              # Insert new
], ["id", "name", "age", "department"])

delta_table.alias("target").merge(
    new_employees.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "age": "source.age",
    "department": "source.department"
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "name": "source.name",
    "age": "source.age",
    "department": "source.department"
}).execute()

Time Travel

# View table history
delta_table = DeltaTable.forPath(spark, "/data/employees")
delta_table.history().show()

# +-------+-------------------+------+--------+
# |version|          timestamp|userId|operation|
# +-------+-------------------+------+--------+
# |      3|2024-12-24 10:30:00|  null|   UPDATE|
# |      2|2024-12-24 10:15:00|  null|   DELETE|
# |      1|2024-12-24 10:00:00|  null|    WRITE|
# |      0|2024-12-24 09:45:00|  null|    WRITE|
# +-------+-------------------+------+--------+

# Read specific version
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("/data/employees")

# Read at specific timestamp
df_timestamp = spark.read.format("delta") \
    .option("timestampAsOf", "2024-12-24 10:00:00") \
    .load("/data/employees")

# SQL syntax
spark.sql("SELECT * FROM employees VERSION AS OF 1")
spark.sql("SELECT * FROM employees TIMESTAMP AS OF '2024-12-24 10:00:00'")

# Restore to previous version
delta_table.restoreToVersion(1)
# Or
delta_table.restoreToTimestamp("2024-12-24 10:00:00")

Schema Evolution

# Add new columns automatically
new_data_with_column = spark.createDataFrame([
    (6, "Frank", 33, "HR", "frank@company.com")
], ["id", "name", "age", "department", "email"])

# Schema merge during write
new_data_with_column.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/data/employees")

# Enable auto schema evolution globally
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Change column type (with care!)
spark.sql("""
    ALTER TABLE employees
    ALTER COLUMN age TYPE BIGINT
""")

# Add column with SQL
spark.sql("""
    ALTER TABLE employees
    ADD COLUMN hire_date DATE
""")

# View schema
delta_table.toDF().printSchema()

Medallion Architecture

Delta Lake enables the popular Bronze-Silver-Gold data organization:

┌─────────────────────────────────────────────────────────────────┐
│                    Medallion Architecture                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐                   │
│  │  Bronze  │───▶│  Silver  │───▶│   Gold   │                   │
│  │   Layer  │    │   Layer  │    │   Layer  │                   │
│  └──────────┘    └──────────┘    └──────────┘                   │
│                                                                  │
│  Raw Data        Cleaned &       Aggregated &                    │
│  As-Is           Validated       Business-Ready                  │
├─────────────────────────────────────────────────────────────────┤
│  Bronze: Raw ingestion, preserve source format                   │
│  Silver: Cleaned, deduplicated, standardized                     │
│  Gold: Aggregated, denormalized for reporting                    │
└─────────────────────────────────────────────────────────────────┘
# Bronze Layer - Raw ingestion
raw_data = spark.read.json("/raw/events/")
raw_data.write \
    .format("delta") \
    .mode("append") \
    .save("/delta/bronze/events")

# Silver Layer - Cleaned data
bronze_df = spark.read.format("delta").load("/delta/bronze/events")

silver_df = bronze_df \
    .dropDuplicates(["event_id"]) \
    .filter("event_type IS NOT NULL") \
    .withColumn("processed_at", current_timestamp())

silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/silver/events")

# Gold Layer - Aggregated for reporting
silver_df = spark.read.format("delta").load("/delta/silver/events")

gold_df = silver_df \
    .groupBy("date", "event_type") \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users")
    )

gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/event_summary")

Optimization

# Optimize (compact small files)
delta_table.optimize().executeCompaction()

# Optimize with Z-Order (co-locate related data)
delta_table.optimize().executeZOrderBy("date", "customer_id")

# Vacuum (remove old files)
# Default retention is 7 days
delta_table.vacuum()        # Use default retention
delta_table.vacuum(168)     # 168 hours = 7 days

# Shorter retention (use with caution!)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(0)  # Remove all old files immediately

# Auto-optimize settings
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

# Analyze table for statistics
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS")

# Check table details
spark.sql("DESCRIBE DETAIL employees").show()
spark.sql("DESCRIBE HISTORY employees").show()

Streaming with Delta

# Delta as streaming source
stream_df = spark.readStream \
    .format("delta") \
    .load("/delta/silver/events")

# Delta as streaming sink
query = stream_df \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/gold") \
    .start("/delta/gold/events")

# Process changes only (CDC)
stream_df = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("/delta/silver/events")

# Change data includes _change_type column:
# insert, update_preimage, update_postimage, delete

Best Practices

  • Partition wisely: Use date columns, avoid high cardinality
  • Optimize regularly: Compact small files to improve read performance
  • Use Z-Ordering: For columns frequently used in filters
  • Set retention: Configure vacuum retention based on needs
  • Enable auto-optimize: For write-heavy workloads
  • Monitor table size: Use DESCRIBE DETAIL to track growth

Master Delta Lake

Our Data Engineering program covers Delta Lake, lakehouse architecture, and modern data platforms.

Explore Data Engineering Program

Related Articles