阿里云主机折上折
  • 微信号
Current Site:Index > Overview of Aggregation Pipeline

Overview of Aggregation Pipeline

Author:Chuan Chen 阅读数:22409人阅读 分类: MongoDB

The aggregation pipeline is a powerful tool in MongoDB for processing data streams, enabling complex transformations and analysis of documents by chaining multiple operation stages together. Each stage receives the output from the previous stage as input and generates a new set of documents to pass to the next stage.

Basic Structure of the Aggregation Pipeline

The aggregation pipeline consists of multiple stages, each performing specific operations on the data. A typical pipeline structure looks like this:

db.collection.aggregate([
  { $match: { status: "A" } },
  { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
  { $sort: { total: -1 } }
])

This example shows a three-stage pipeline:

  1. The $match stage filters documents with status "A"
  2. The $group stage groups by cust_id and calculates the sum of amount
  3. The $sort stage sorts the results by total in descending order

Detailed Explanation of Common Aggregation Stages

$match Stage

$match is used to filter documents, similar to query conditions in the find() method. It can reduce the number of documents processed in subsequent stages, improving performance.

{ $match: { 
  age: { $gt: 18 },
  department: "engineering",
  joinDate: { $gte: new Date("2020-01-01") }
} }

$group Stage

$group is the core stage of the aggregation pipeline, used for grouping and calculating aggregated values. It must specify an _id field as the grouping key.

{ $group: {
  _id: "$department",
  averageSalary: { $avg: "$salary" },
  maxAge: { $max: "$age" },
  employeeCount: { $sum: 1 }
} }

$project Stage

$project reshapes document structures, allowing inclusion, exclusion, or renaming of fields, as well as creating computed fields.

{ $project: {
  name: 1,
  department: 1,
  monthlySalary: { $divide: ["$salary", 12] },
  _id: 0
} }

$sort Stage

$sort sorts documents, where 1 indicates ascending order and -1 indicates descending order.

{ $sort: { 
  department: 1,
  salary: -1 
} }

$limit and $skip Stages

These stages are used for pagination:

{ $skip: 20 },
{ $limit: 10 }

Advanced Aggregation Operations

Array Operations

MongoDB provides rich array operators:

{ $unwind: "$tags" },  // Expands an array into multiple documents
{ $addToSet: "$skills" },  // Adds values to a set (deduplicated)
{ $push: { comments: "$newComment" } }  // Appends values to an array

Conditional Expressions

Conditional logic can be used in aggregations:

{ $project: {
  name: 1,
  bonus: {
    $cond: {
      if: { $gte: ["$sales", 10000] },
      then: 1000,
      else: 0
    }
  }
} }

Date Handling

MongoDB provides date operators:

{ $project: {
  year: { $year: "$joinDate" },
  month: { $month: "$joinDate" },
  dayOfWeek: { $dayOfWeek: "$joinDate" }
} }

Performance Optimization Tips

Index Utilization

Ensure $match and $sort stages can use indexes:

  • Place $match at the beginning of the pipeline
  • Use $match before sorting to reduce document count
  • Ensure sorted fields have indexes

Memory Limits

The aggregation pipeline has a default 100MB memory limit. For large datasets:

  • Use the allowDiskUse option
  • Optimize the pipeline to reduce intermediate result size
  • Use $match and $project early to minimize data volume

Sharded Cluster Considerations

On sharded clusters:

  • If the first stage is $match, it executes on the shards
  • The $group stage typically requires merging on the primary shard
  • Consider using the $out stage to write results to a new collection

Practical Application Examples

E-commerce Data Analysis

db.orders.aggregate([
  { $match: { 
    orderDate: { 
      $gte: new Date("2023-01-01"),
      $lt: new Date("2023-02-01") 
    } 
  } },
  { $unwind: "$items" },
  { $group: {
    _id: "$items.category",
    totalSales: { $sum: "$items.price" },
    avgQuantity: { $avg: "$items.quantity" },
    orderCount: { $sum: 1 }
  } },
  { $sort: { totalSales: -1 } },
  { $limit: 5 }
])

User Behavior Analysis

db.userActivities.aggregate([
  { $match: { 
    timestamp: { $gte: new Date("2023-06-01") } 
  } },
  { $group: {
    _id: {
      userId: "$userId",
      action: "$actionType"
    },
    count: { $sum: 1 },
    lastOccurred: { $max: "$timestamp" }
  } },
  { $project: {
    userId: "$_id.userId",
    action: "$_id.action",
    count: 1,
    lastOccurred: 1,
    isFrequent: { $gt: ["$count", 10] }
  } },
  { $sort: { count: -1 } }
])

Pipeline Expressions and Operators

MongoDB provides a rich set of expressions and operators:

Arithmetic Operators

{ $project: {
  netPrice: { $subtract: ["$price", "$discount"] },
  tax: { $multiply: ["$price", 0.1] },
  finalPrice: { 
    $add: [
      { $subtract: ["$price", "$discount"] },
      { $multiply: ["$price", 0.1] }
    ]
  }
} }

String Operations

{ $project: {
  fullName: { $concat: ["$firstName", " ", "$lastName"] },
  emailDomain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1 ] },
  nameLength: { $strLenCP: "$name" }
} }

Aggregation Accumulators

{ $group: {
  _id: null,
  total: { $sum: "$value" },
  average: { $avg: "$value" },
  min: { $min: "$value" },
  max: { $max: "$value" },
  values: { $push: "$value" },
  uniqueValues: { $addToSet: "$value" }
} }

Importance of Pipeline Stage Order

The order of stages in the aggregation pipeline significantly impacts results and performance:

  1. Use $match early to reduce document count
  2. Using $sort before $group can optimize certain grouping operations
  3. $project can be placed at different positions for different purposes:
    • Early to reduce field count
    • Late to reshape output format

Example of incorrect order:

// Inefficient order
db.orders.aggregate([
  { $project: { items: 1 } },
  { $unwind: "$items" },
  { $match: { "items.price": { $gt: 100 } } }
])

// Optimized order
db.orders.aggregate([
  { $match: { "items.price": { $gt: 100 } } },
  { $unwind: "$items" },
  { $match: { "items.price": { $gt: 100 } } },
  { $project: { items: 1 } }
])

Special Stages and Features

$facet Stage

$facet allows executing multiple sub-pipelines within a single aggregation pipeline:

db.products.aggregate([
  { $facet: {
    "priceStats": [
      { $match: { category: "Electronics" } },
      { $group: {
        _id: null,
        avgPrice: { $avg: "$price" },
        maxPrice: { $max: "$price" }
      } }
    ],
    "topSellers": [
      { $sort: { sales: -1 } },
      { $limit: 5 },
      { $project: { name: 1, sales: 1 } }
    ]
  } }
])

$lookup Stage

Implements SQL-like join operations:

db.orders.aggregate([
  { $lookup: {
    from: "customers",
    localField: "customerId",
    foreignField: "_id",
    as: "customerInfo"
  } },
  { $unwind: "$customerInfo" },
  { $project: {
    orderId: 1,
    amount: 1,
    customerName: "$customerInfo.name"
  } }
])

$graphLookup Stage

Recursively searches documents, suitable for hierarchical data:

db.employees.aggregate([
  { $match: { name: "John Doe" } },
  { $graphLookup: {
    from: "employees",
    startWith: "$reportsTo",
    connectFromField: "reportsTo",
    connectToField: "_id",
    as: "managementChain",
    depthField: "level"
  } }
])

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.