MongoDB Aggregation
Introduction to Aggregation
MongoDB's Aggregation Pipeline is a powerful framework for data processing and transformation. It processes documents through a series of stages, where each stage transforms the documents as they pass through.
Basic Pipeline Structure
// Aggregation pipeline syntax
db.collection.aggregate([
{ stage1 },
{ stage2 },
{ stage3 },
// ...more stages
]);
// With Mongoose
const results = await Order.aggregate([
{ $match: { status: 'completed' } },
{ $group: { _id: '$customerId', total: { $sum: '$amount' } } },
{ $sort: { total: -1 } },
{ $limit: 10 }
]);
Common Pipeline Stages
$match - Filtering Documents
// Filter documents (like find())
// Should be early in pipeline for performance
// Simple match
{ $match: { status: 'active' } }
// Multiple conditions
{ $match: {
status: 'completed',
amount: { $gte: 100 },
createdAt: { $gte: new Date('2024-01-01') }
}}
// Using $or
{ $match: {
$or: [
{ status: 'pending' },
{ status: 'processing' }
]
}}
// Regex match
{ $match: {
email: { $regex: /@gmail\.com$/i }
}}
// Example: Find active premium users
const premiumUsers = await User.aggregate([
{ $match: {
status: 'active',
subscription: 'premium',
lastLoginAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
}}
]);
$group - Grouping Documents
// Group documents and compute aggregates
// _id specifies the grouping key
// Group by single field
{ $group: {
_id: '$category',
count: { $sum: 1 },
totalSales: { $sum: '$amount' }
}}
// Group by multiple fields
{ $group: {
_id: { year: { $year: '$date' }, month: { $month: '$date' } },
revenue: { $sum: '$amount' },
orders: { $sum: 1 }
}}
// Group all documents (_id: null)
{ $group: {
_id: null,
totalRevenue: { $sum: '$amount' },
avgOrderValue: { $avg: '$amount' },
maxOrder: { $max: '$amount' },
minOrder: { $min: '$amount' },
orderCount: { $sum: 1 }
}}
// Accumulator operators
{ $group: {
_id: '$category',
count: { $sum: 1 }, // Count documents
total: { $sum: '$price' }, // Sum values
average: { $avg: '$price' }, // Average
max: { $max: '$price' }, // Maximum
min: { $min: '$price' }, // Minimum
first: { $first: '$name' }, // First in group
last: { $last: '$name' }, // Last in group
items: { $push: '$name' }, // Array of values
uniqueItems: { $addToSet: '$status' }, // Unique values
stdDev: { $stdDevPop: '$price' } // Standard deviation
}}
// Example: Sales by category
const salesByCategory = await Product.aggregate([
{ $match: { status: 'sold' } },
{ $group: {
_id: '$category',
totalSales: { $sum: '$price' },
itemsSold: { $sum: 1 },
avgPrice: { $avg: '$price' }
}},
{ $sort: { totalSales: -1 } }
]);
$project - Reshaping Documents
// Include, exclude, or compute fields
// Include specific fields (1) and exclude _id (0)
{ $project: {
_id: 0,
name: 1,
email: 1,
status: 1
}}
// Rename fields
{ $project: {
userId: '$_id',
fullName: '$name',
emailAddress: '$email'
}}
// Computed fields
{ $project: {
name: 1,
totalPrice: { $multiply: ['$price', '$quantity'] },
discountedPrice: {
$subtract: [
'$price',
{ $multiply: ['$price', '$discountPercent', 0.01] }
]
}
}}
// Conditional fields
{ $project: {
name: 1,
priceCategory: {
$cond: {
if: { $gte: ['$price', 100] },
then: 'expensive',
else: 'affordable'
}
},
status: {
$switch: {
branches: [
{ case: { $eq: ['$stock', 0] }, then: 'out-of-stock' },
{ case: { $lt: ['$stock', 10] }, then: 'low-stock' }
],
default: 'in-stock'
}
}
}}
// String operations
{ $project: {
fullName: { $concat: ['$firstName', ' ', '$lastName'] },
email: { $toLower: '$email' },
initials: { $substr: ['$name', 0, 2] }
}}
// Date operations
{ $project: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' },
dayOfWeek: { $dayOfWeek: '$createdAt' },
formattedDate: {
$dateToString: { format: '%Y-%m-%d', date: '$createdAt' }
}
}}
$sort, $limit, $skip
// Sort documents (1 = ascending, -1 = descending)
{ $sort: { createdAt: -1, name: 1 } }
// Limit results
{ $limit: 10 }
// Skip documents (for pagination)
{ $skip: 20 }
// Pagination example
const page = 2;
const pageSize = 10;
const results = await Product.aggregate([
{ $match: { category: 'electronics' } },
{ $sort: { createdAt: -1 } },
{ $skip: (page - 1) * pageSize },
{ $limit: pageSize }
]);
// With total count (using $facet)
const results = await Product.aggregate([
{ $match: { category: 'electronics' } },
{ $facet: {
data: [
{ $sort: { createdAt: -1 } },
{ $skip: (page - 1) * pageSize },
{ $limit: pageSize }
],
totalCount: [
{ $count: 'count' }
]
}}
]);
$lookup - Joining Collections
// Join with another collection (like SQL LEFT JOIN)
// Basic lookup
{ $lookup: {
from: 'users', // Collection to join
localField: 'userId', // Field in current collection
foreignField: '_id', // Field in foreign collection
as: 'user' // Output array field
}}
// Unwind to convert array to object (for one-to-one)
[
{ $lookup: {
from: 'users',
localField: 'userId',
foreignField: '_id',
as: 'user'
}},
{ $unwind: '$user' } // Convert array to single object
]
// Pipeline lookup (more control)
{ $lookup: {
from: 'orders',
let: { userId: '$_id' }, // Variables from current doc
pipeline: [
{ $match: {
$expr: { $eq: ['$customerId', '$$userId'] }
}},
{ $match: { status: 'completed' } },
{ $sort: { createdAt: -1 } },
{ $limit: 5 }
],
as: 'recentOrders'
}}
// Example: Users with their order stats
const usersWithStats = await User.aggregate([
{ $lookup: {
from: 'orders',
let: { userId: '$_id' },
pipeline: [
{ $match: {
$expr: { $eq: ['$customerId', '$$userId'] },
status: 'completed'
}},
{ $group: {
_id: null,
totalOrders: { $sum: 1 },
totalSpent: { $sum: '$amount' }
}}
],
as: 'orderStats'
}},
{ $unwind: { path: '$orderStats', preserveNullAndEmptyArrays: true } },
{ $project: {
name: 1,
email: 1,
totalOrders: { $ifNull: ['$orderStats.totalOrders', 0] },
totalSpent: { $ifNull: ['$orderStats.totalSpent', 0] }
}}
]);
$unwind - Deconstructing Arrays
// Deconstruct array field into multiple documents
// Sample document: { _id: 1, tags: ['a', 'b', 'c'] }
// Basic unwind
{ $unwind: '$tags' }
// Result: 3 documents, one for each tag
// Preserve documents with empty/missing arrays
{ $unwind: {
path: '$tags',
preserveNullAndEmptyArrays: true
}}
// Include array index
{ $unwind: {
path: '$items',
includeArrayIndex: 'itemIndex'
}}
// Example: Analyze order items
const itemAnalysis = await Order.aggregate([
{ $unwind: '$items' },
{ $group: {
_id: '$items.productId',
totalQuantity: { $sum: '$items.quantity' },
totalRevenue: { $sum: { $multiply: ['$items.price', '$items.quantity'] } },
orderCount: { $sum: 1 }
}},
{ $lookup: {
from: 'products',
localField: '_id',
foreignField: '_id',
as: 'product'
}},
{ $unwind: '$product' },
{ $project: {
productName: '$product.name',
totalQuantity: 1,
totalRevenue: 1,
orderCount: 1
}},
{ $sort: { totalRevenue: -1 } }
]);
Advanced Stages
$facet - Multiple Pipelines
// Run multiple pipelines in single aggregation
// Dashboard data in one query
const dashboard = await Order.aggregate([
{ $match: { createdAt: { $gte: startOfMonth } } },
{ $facet: {
// Revenue by day
dailyRevenue: [
{ $group: {
_id: { $dateToString: { format: '%Y-%m-%d', date: '$createdAt' } },
revenue: { $sum: '$amount' }
}},
{ $sort: { _id: 1 } }
],
// Top products
topProducts: [
{ $unwind: '$items' },
{ $group: {
_id: '$items.productId',
sales: { $sum: '$items.quantity' }
}},
{ $sort: { sales: -1 } },
{ $limit: 5 }
],
// Order status counts
statusCounts: [
{ $group: {
_id: '$status',
count: { $sum: 1 }
}}
],
// Summary stats
summary: [
{ $group: {
_id: null,
totalOrders: { $sum: 1 },
totalRevenue: { $sum: '$amount' },
avgOrderValue: { $avg: '$amount' }
}}
]
}}
]);
$bucket and $bucketAuto
// Categorize documents into buckets
// Manual bucket boundaries
{ $bucket: {
groupBy: '$price',
boundaries: [0, 50, 100, 200, 500, 1000],
default: 'Other',
output: {
count: { $sum: 1 },
products: { $push: '$name' }
}
}}
// Automatic buckets
{ $bucketAuto: {
groupBy: '$price',
buckets: 5, // Number of buckets
output: {
count: { $sum: 1 },
avgPrice: { $avg: '$price' }
}
}}
// Example: User age distribution
const ageDistribution = await User.aggregate([
{ $bucket: {
groupBy: '$age',
boundaries: [0, 18, 25, 35, 45, 55, 65, 100],
default: 'Unknown',
output: {
count: { $sum: 1 },
users: { $push: '$name' }
}
}}
]);
$addFields and $set
// Add or modify fields (keeps existing fields)
// Add computed fields
{ $addFields: {
totalPrice: { $multiply: ['$price', '$quantity'] },
fullName: { $concat: ['$firstName', ' ', '$lastName'] }
}}
// $set is an alias for $addFields
{ $set: {
lastModified: new Date(),
status: 'processed'
}}
// Conditional field
{ $addFields: {
ageGroup: {
$switch: {
branches: [
{ case: { $lt: ['$age', 18] }, then: 'minor' },
{ case: { $lt: ['$age', 30] }, then: 'young-adult' },
{ case: { $lt: ['$age', 50] }, then: 'adult' }
],
default: 'senior'
}
}
}}
$redact - Document-Level Access Control
// Control document/field access based on conditions
{ $redact: {
$cond: {
if: { $eq: ['$securityLevel', 'public'] },
then: '$$DESCEND', // Include field/subdocument
else: '$$PRUNE' // Exclude field/subdocument
}
}}
// $$DESCEND - keep this level and check nested
// $$PRUNE - remove this level
// $$KEEP - keep this level and all nested
// Example: Filter sensitive data based on user role
const filterByRole = (userRole) => ({
$redact: {
$cond: {
if: {
$or: [
{ $eq: ['$accessLevel', 'public'] },
{ $in: [userRole, '$allowedRoles'] }
]
},
then: '$$DESCEND',
else: '$$PRUNE'
}
}
});
Practical Examples
E-commerce Analytics
// Monthly sales report
const monthlySales = await Order.aggregate([
{ $match: {
status: 'completed',
createdAt: { $gte: new Date('2024-01-01') }
}},
{ $group: {
_id: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' }
},
totalRevenue: { $sum: '$amount' },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: '$amount' },
uniqueCustomers: { $addToSet: '$customerId' }
}},
{ $addFields: {
customerCount: { $size: '$uniqueCustomers' }
}},
{ $project: {
_id: 0,
year: '$_id.year',
month: '$_id.month',
totalRevenue: { $round: ['$totalRevenue', 2] },
orderCount: 1,
avgOrderValue: { $round: ['$avgOrderValue', 2] },
customerCount: 1
}},
{ $sort: { year: 1, month: 1 } }
]);
// Customer lifetime value
const customerLTV = await Order.aggregate([
{ $match: { status: 'completed' } },
{ $group: {
_id: '$customerId',
totalSpent: { $sum: '$amount' },
orderCount: { $sum: 1 },
firstOrder: { $min: '$createdAt' },
lastOrder: { $max: '$createdAt' },
avgOrderValue: { $avg: '$amount' }
}},
{ $lookup: {
from: 'users',
localField: '_id',
foreignField: '_id',
as: 'customer'
}},
{ $unwind: '$customer' },
{ $project: {
customerName: '$customer.name',
email: '$customer.email',
totalSpent: 1,
orderCount: 1,
avgOrderValue: { $round: ['$avgOrderValue', 2] },
customerSince: '$firstOrder',
daysSinceLastOrder: {
$divide: [
{ $subtract: [new Date(), '$lastOrder'] },
1000 * 60 * 60 * 24
]
}
}},
{ $sort: { totalSpent: -1 } },
{ $limit: 100 }
]);
User Activity Analysis
// User engagement metrics
const userEngagement = await Activity.aggregate([
{ $match: {
timestamp: { $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) }
}},
{ $group: {
_id: '$userId',
totalActions: { $sum: 1 },
uniqueDays: { $addToSet: {
$dateToString: { format: '%Y-%m-%d', date: '$timestamp' }
}},
actionTypes: { $push: '$action' },
lastActivity: { $max: '$timestamp' }
}},
{ $addFields: {
activeDays: { $size: '$uniqueDays' },
actionBreakdown: {
$arrayToObject: {
$map: {
input: { $setUnion: '$actionTypes' },
as: 'action',
in: {
k: '$$action',
v: {
$size: {
$filter: {
input: '$actionTypes',
cond: { $eq: ['$$this', '$$action'] }
}
}
}
}
}
}
}
}},
{ $lookup: {
from: 'users',
localField: '_id',
foreignField: '_id',
as: 'user'
}},
{ $unwind: '$user' },
{ $project: {
userName: '$user.name',
totalActions: 1,
activeDays: 1,
avgActionsPerDay: { $divide: ['$totalActions', '$activeDays'] },
actionBreakdown: 1,
lastActivity: 1
}},
{ $sort: { totalActions: -1 } }
]);
Time-Series Analysis
// Hourly traffic analysis
const hourlyTraffic = await PageView.aggregate([
{ $match: {
timestamp: {
$gte: new Date(Date.now() - 24 * 60 * 60 * 1000)
}
}},
{ $group: {
_id: { $hour: '$timestamp' },
views: { $sum: 1 },
uniqueVisitors: { $addToSet: '$sessionId' },
avgLoadTime: { $avg: '$loadTime' }
}},
{ $addFields: {
uniqueVisitorCount: { $size: '$uniqueVisitors' }
}},
{ $project: {
hour: '$_id',
views: 1,
uniqueVisitorCount: 1,
avgLoadTime: { $round: ['$avgLoadTime', 2] }
}},
{ $sort: { hour: 1 } }
]);
// Moving average (7-day)
const movingAverage = await DailyStats.aggregate([
{ $setWindowFields: {
sortBy: { date: 1 },
output: {
movingAvg: {
$avg: '$revenue',
window: {
documents: [-6, 0] // Current + 6 previous days
}
}
}
}}
]);
Performance Optimization
// 1. Use $match early to filter data
[
{ $match: { status: 'active' } }, // Filter first!
{ $group: { ... } }
]
// 2. Create indexes for $match and $sort fields
db.orders.createIndex({ status: 1, createdAt: -1 });
// 3. Use $project to limit fields early
[
{ $match: { status: 'active' } },
{ $project: { amount: 1, customerId: 1 } }, // Only needed fields
{ $group: { ... } }
]
// 4. Use allowDiskUse for large datasets
const results = await Order.aggregate([
{ $group: { ... } }
]).allowDiskUse(true);
// 5. Explain the aggregation
const explanation = await Order.aggregate([
{ $match: { status: 'completed' } },
{ $group: { _id: '$customerId', total: { $sum: '$amount' } } }
]).explain('executionStats');
// 6. Use $limit early when possible
[
{ $match: { ... } },
{ $sort: { createdAt: -1 } },
{ $limit: 100 }, // Limit before expensive operations
{ $lookup: { ... } }
]
Key Takeaways
- Aggregation pipeline processes documents through sequential stages
- Use $match early to filter data and improve performance
- $group enables powerful calculations with accumulator operators
- $lookup joins collections (like SQL JOIN)
- $facet runs multiple pipelines in one aggregation
- $unwind deconstructs arrays into separate documents
- Create indexes on fields used in $match and $sort
- Use allowDiskUse(true) for memory-intensive operations
