What is Medallion Architecture?

Medallion architecture is a data design pattern used to logically organize data in a lakehouse. It consists of three layers - Bronze, Silver, and Gold - each serving a specific purpose in the data refinement process.

This pattern provides a clear framework for incrementally improving data quality as it flows through each layer.

The Three Layers

┌─────────────────────────────────────────────────────────────────┐
│                    Medallion Architecture                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                     DATA SOURCES                         │    │
│  │     APIs    Databases    Files    Streams    IoT         │    │
│  └───────────────────────────┬─────────────────────────────┘    │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  🥉 BRONZE LAYER (Raw)                                   │    │
│  │  • Raw data as-is from source                            │    │
│  │  • Append-only, immutable                                │    │
│  │  • Preserve source format and schema                     │    │
│  │  • Add metadata: ingestion time, source, batch ID        │    │
│  └───────────────────────────┬─────────────────────────────┘    │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  🥈 SILVER LAYER (Cleansed)                              │    │
│  │  • Cleaned and validated data                            │    │
│  │  • Deduplicated records                                  │    │
│  │  • Standardized formats (dates, types)                   │    │
│  │  • Business rules applied                                │    │
│  │  • Conformed dimensions                                  │    │
│  └───────────────────────────┬─────────────────────────────┘    │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  🥇 GOLD LAYER (Business-Ready)                          │    │
│  │  • Aggregated and enriched                               │    │
│  │  • Denormalized for performance                          │    │
│  │  • Business-level entities                               │    │
│  │  • Ready for reporting and ML                            │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Bronze Layer

The landing zone for raw data. Keep it simple - just get the data in.

# Bronze Layer Example
from pyspark.sql.functions import current_timestamp, lit, input_file_name

# Ingest raw data with metadata
raw_events = spark.read.json("/raw/events/2024-12-24/*.json")

bronze_events = raw_events \
    .withColumn("_ingestion_timestamp", current_timestamp()) \
    .withColumn("_source_file", input_file_name()) \
    .withColumn("_batch_id", lit("batch_20241224_001"))

# Save to Bronze layer (append-only)
bronze_events.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("_ingestion_date") \
    .save("/delta/bronze/events")

# Bronze table structure
"""
bronze.events
├── event_payload (original JSON as string or struct)
├── _ingestion_timestamp
├── _source_file
├── _batch_id
└── _ingestion_date (partition column)
"""

# Key principles:
# - Never modify or delete data
# - Keep original schema
# - Add metadata columns (prefix with _)
# - Partition by ingestion date for manageability

Silver Layer

Clean, deduplicate, and standardize. This is where data quality happens.

# Silver Layer Example
from pyspark.sql.functions import col, when, to_timestamp, trim, lower
from pyspark.sql.window import Window

# Read from Bronze
bronze_df = spark.read.format("delta").load("/delta/bronze/events")

# 1. Parse and flatten
silver_df = bronze_df.select(
    col("event_id"),
    col("user_id"),
    col("event_type"),
    to_timestamp(col("event_timestamp")).alias("event_time"),
    col("properties.product_id").alias("product_id"),
    col("properties.amount").cast("decimal(10,2)").alias("amount")
)

# 2. Clean and standardize
silver_df = silver_df \
    .filter(col("event_id").isNotNull()) \
    .withColumn("event_type", lower(trim(col("event_type")))) \
    .withColumn("amount", when(col("amount") < 0, 0).otherwise(col("amount")))

# 3. Deduplicate (keep latest)
window = Window.partitionBy("event_id").orderBy(col("_ingestion_timestamp").desc())
silver_df = silver_df \
    .withColumn("row_num", row_number().over(window)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# 4. Add silver metadata
silver_df = silver_df \
    .withColumn("_silver_timestamp", current_timestamp()) \
    .withColumn("_is_valid", lit(True))

# 5. Merge into Silver table
from delta.tables import DeltaTable

if DeltaTable.isDeltaTable(spark, "/delta/silver/events"):
    silver_table = DeltaTable.forPath(spark, "/delta/silver/events")
    silver_table.alias("target").merge(
        silver_df.alias("source"),
        "target.event_id = source.event_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    silver_df.write.format("delta").save("/delta/silver/events")

Gold Layer

Business-ready aggregations and feature tables for analytics and ML.

# Gold Layer Example - Business Aggregations

# Read from Silver
events = spark.read.format("delta").load("/delta/silver/events")
users = spark.read.format("delta").load("/delta/silver/users")
products = spark.read.format("delta").load("/delta/silver/products")

# Gold table: Daily Sales Summary
daily_sales = events \
    .filter(col("event_type") == "purchase") \
    .join(products, "product_id") \
    .groupBy(
        col("event_time").cast("date").alias("date"),
        col("product_category")
    ).agg(
        count("*").alias("transaction_count"),
        sum("amount").alias("total_revenue"),
        countDistinct("user_id").alias("unique_customers"),
        avg("amount").alias("avg_order_value")
    )

daily_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/daily_sales_summary")

# Gold table: Customer 360 (feature table for ML)
customer_features = events \
    .groupBy("user_id") \
    .agg(
        count("*").alias("total_events"),
        countDistinct("event_type").alias("event_type_count"),
        sum(when(col("event_type") == "purchase", col("amount"))).alias("total_spent"),
        max("event_time").alias("last_activity"),
        datediff(current_date(), max("event_time")).alias("days_since_last_activity")
    ).join(users, "user_id")

customer_features.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/customer_360")

# Gold table: Product Performance
product_metrics = events \
    .filter(col("event_type").isin(["view", "add_to_cart", "purchase"])) \
    .groupBy("product_id") \
    .pivot("event_type") \
    .count() \
    .withColumn("conversion_rate",
        col("purchase") / col("view") * 100
    )

product_metrics.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/product_metrics")

Folder Structure

# Recommended folder structure

/delta/
├── bronze/
│   ├── events/           # Raw event data
│   ├── users/            # Raw user data
│   ├── transactions/     # Raw transaction data
│   └── _checkpoints/     # Streaming checkpoints
│
├── silver/
│   ├── events/           # Cleaned events
│   ├── users/            # Cleaned users
│   ├── transactions/     # Cleaned transactions
│   └── dim_date/         # Conformed date dimension
│
├── gold/
│   ├── daily_sales_summary/
│   ├── customer_360/
│   ├── product_metrics/
│   └── executive_dashboard/
│
└── _schemas/             # Schema definitions
    ├── bronze/
    ├── silver/
    └── gold/

Implementation Patterns

# Pattern 1: Streaming Bronze to Silver
bronze_stream = spark.readStream \
    .format("delta") \
    .load("/delta/bronze/events")

cleaned_stream = bronze_stream \
    .filter(col("event_id").isNotNull()) \
    .withColumn("event_type", lower(col("event_type")))

cleaned_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/delta/bronze/_checkpoints/events") \
    .trigger(processingTime="5 minutes") \
    .start("/delta/silver/events")

# Pattern 2: Incremental Silver to Gold
from delta.tables import DeltaTable

# Get last processed timestamp
last_run = spark.read.format("delta") \
    .load("/delta/gold/_metadata/last_run") \
    .collect()[0]["timestamp"]

# Process only new Silver data
new_data = spark.read.format("delta") \
    .load("/delta/silver/events") \
    .filter(col("_silver_timestamp") > last_run)

# Aggregate and merge to Gold
aggregated = new_data.groupBy("date", "product_id").agg(...)

gold_table = DeltaTable.forPath(spark, "/delta/gold/product_daily")
gold_table.alias("g").merge(
    aggregated.alias("n"),
    "g.date = n.date AND g.product_id = n.product_id"
).whenMatchedUpdate(...).whenNotMatchedInsert(...).execute()

Best Practices

  • Bronze is immutable: Never update or delete raw data
  • Add metadata: Track lineage with ingestion timestamps and source info
  • Validate at Silver: Apply data quality checks before Silver
  • Incremental processing: Process only new/changed data
  • Schema evolution: Plan for schema changes at each layer
  • Document transformations: Clear mapping from Bronze to Gold
  • Partition wisely: Date partitioning for Bronze/Silver, business keys for Gold

Master Medallion Architecture

Our Data Engineering program covers lakehouse architecture and production data platforms.

Explore Data Engineering Program

Related Articles