Overview of Aggregation Pipeline
The aggregation pipeline is a powerful tool in MongoDB for processing data streams, enabling complex transformations and analysis of documents by chaining multiple operation stages together. Each stage receives the output from the previous stage as input and generates a new set of documents to pass to the next stage.
Basic Structure of the Aggregation Pipeline
The aggregation pipeline consists of multiple stages, each performing specific operations on the data. A typical pipeline structure looks like this:
db.collection.aggregate([
{ $match: { status: "A" } },
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
])
This example shows a three-stage pipeline:
- The
$match
stage filters documents with status "A" - The
$group
stage groups by cust_id and calculates the sum of amount - The
$sort
stage sorts the results by total in descending order
Detailed Explanation of Common Aggregation Stages
$match Stage
$match
is used to filter documents, similar to query conditions in the find() method. It can reduce the number of documents processed in subsequent stages, improving performance.
{ $match: {
age: { $gt: 18 },
department: "engineering",
joinDate: { $gte: new Date("2020-01-01") }
} }
$group Stage
$group
is the core stage of the aggregation pipeline, used for grouping and calculating aggregated values. It must specify an _id
field as the grouping key.
{ $group: {
_id: "$department",
averageSalary: { $avg: "$salary" },
maxAge: { $max: "$age" },
employeeCount: { $sum: 1 }
} }
$project Stage
$project
reshapes document structures, allowing inclusion, exclusion, or renaming of fields, as well as creating computed fields.
{ $project: {
name: 1,
department: 1,
monthlySalary: { $divide: ["$salary", 12] },
_id: 0
} }
$sort Stage
$sort
sorts documents, where 1 indicates ascending order and -1 indicates descending order.
{ $sort: {
department: 1,
salary: -1
} }
$limit and $skip Stages
These stages are used for pagination:
{ $skip: 20 },
{ $limit: 10 }
Advanced Aggregation Operations
Array Operations
MongoDB provides rich array operators:
{ $unwind: "$tags" }, // Expands an array into multiple documents
{ $addToSet: "$skills" }, // Adds values to a set (deduplicated)
{ $push: { comments: "$newComment" } } // Appends values to an array
Conditional Expressions
Conditional logic can be used in aggregations:
{ $project: {
name: 1,
bonus: {
$cond: {
if: { $gte: ["$sales", 10000] },
then: 1000,
else: 0
}
}
} }
Date Handling
MongoDB provides date operators:
{ $project: {
year: { $year: "$joinDate" },
month: { $month: "$joinDate" },
dayOfWeek: { $dayOfWeek: "$joinDate" }
} }
Performance Optimization Tips
Index Utilization
Ensure $match
and $sort
stages can use indexes:
- Place
$match
at the beginning of the pipeline - Use
$match
before sorting to reduce document count - Ensure sorted fields have indexes
Memory Limits
The aggregation pipeline has a default 100MB memory limit. For large datasets:
- Use the
allowDiskUse
option - Optimize the pipeline to reduce intermediate result size
- Use
$match
and$project
early to minimize data volume
Sharded Cluster Considerations
On sharded clusters:
- If the first stage is
$match
, it executes on the shards - The
$group
stage typically requires merging on the primary shard - Consider using the
$out
stage to write results to a new collection
Practical Application Examples
E-commerce Data Analysis
db.orders.aggregate([
{ $match: {
orderDate: {
$gte: new Date("2023-01-01"),
$lt: new Date("2023-02-01")
}
} },
{ $unwind: "$items" },
{ $group: {
_id: "$items.category",
totalSales: { $sum: "$items.price" },
avgQuantity: { $avg: "$items.quantity" },
orderCount: { $sum: 1 }
} },
{ $sort: { totalSales: -1 } },
{ $limit: 5 }
])
User Behavior Analysis
db.userActivities.aggregate([
{ $match: {
timestamp: { $gte: new Date("2023-06-01") }
} },
{ $group: {
_id: {
userId: "$userId",
action: "$actionType"
},
count: { $sum: 1 },
lastOccurred: { $max: "$timestamp" }
} },
{ $project: {
userId: "$_id.userId",
action: "$_id.action",
count: 1,
lastOccurred: 1,
isFrequent: { $gt: ["$count", 10] }
} },
{ $sort: { count: -1 } }
])
Pipeline Expressions and Operators
MongoDB provides a rich set of expressions and operators:
Arithmetic Operators
{ $project: {
netPrice: { $subtract: ["$price", "$discount"] },
tax: { $multiply: ["$price", 0.1] },
finalPrice: {
$add: [
{ $subtract: ["$price", "$discount"] },
{ $multiply: ["$price", 0.1] }
]
}
} }
String Operations
{ $project: {
fullName: { $concat: ["$firstName", " ", "$lastName"] },
emailDomain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1 ] },
nameLength: { $strLenCP: "$name" }
} }
Aggregation Accumulators
{ $group: {
_id: null,
total: { $sum: "$value" },
average: { $avg: "$value" },
min: { $min: "$value" },
max: { $max: "$value" },
values: { $push: "$value" },
uniqueValues: { $addToSet: "$value" }
} }
Importance of Pipeline Stage Order
The order of stages in the aggregation pipeline significantly impacts results and performance:
- Use
$match
early to reduce document count - Using
$sort
before$group
can optimize certain grouping operations $project
can be placed at different positions for different purposes:- Early to reduce field count
- Late to reshape output format
Example of incorrect order:
// Inefficient order
db.orders.aggregate([
{ $project: { items: 1 } },
{ $unwind: "$items" },
{ $match: { "items.price": { $gt: 100 } } }
])
// Optimized order
db.orders.aggregate([
{ $match: { "items.price": { $gt: 100 } } },
{ $unwind: "$items" },
{ $match: { "items.price": { $gt: 100 } } },
{ $project: { items: 1 } }
])
Special Stages and Features
$facet Stage
$facet
allows executing multiple sub-pipelines within a single aggregation pipeline:
db.products.aggregate([
{ $facet: {
"priceStats": [
{ $match: { category: "Electronics" } },
{ $group: {
_id: null,
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" }
} }
],
"topSellers": [
{ $sort: { sales: -1 } },
{ $limit: 5 },
{ $project: { name: 1, sales: 1 } }
]
} }
])
$lookup Stage
Implements SQL-like join operations:
db.orders.aggregate([
{ $lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customerInfo"
} },
{ $unwind: "$customerInfo" },
{ $project: {
orderId: 1,
amount: 1,
customerName: "$customerInfo.name"
} }
])
$graphLookup Stage
Recursively searches documents, suitable for hierarchical data:
db.employees.aggregate([
{ $match: { name: "John Doe" } },
{ $graphLookup: {
from: "employees",
startWith: "$reportsTo",
connectFromField: "reportsTo",
connectToField: "_id",
as: "managementChain",
depthField: "level"
} }
])
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:索引选择与排序优化
下一篇:配置级别(系统、全局、本地)