阿里云主机折上折
  • 微信号
Current Site:Index > Data change monitoring (Change Streams)

Data change monitoring (Change Streams)

Author:Chuan Chen 阅读数:41969人阅读 分类: MongoDB

Data Change Monitoring (Change Streams)

Mongoose's Change Streams feature allows developers to monitor change events in MongoDB collections. When documents are inserted, updated, deleted, or replaced, Change Streams push these changes in real time, making it ideal for scenarios requiring real-time data synchronization.

Basic Usage of Change Streams

In Mongoose, you can enable Change Streams monitoring using the watch() method. Here’s a basic example:

const mongoose = require('mongoose');
const Schema = mongoose.Schema;

const userSchema = new Schema({
  name: String,
  age: Number,
  email: String
});

const User = mongoose.model('User', userSchema);

// Connect to MongoDB
mongoose.connect('mongodb://localhost:27017/test');

// Monitor changes in the User collection
const changeStream = User.watch();

changeStream.on('change', (change) => {
  console.log('Change detected:', change);
});

When any modification is made to the User collection, the console will output the change information. For example, inserting a new document:

User.create({ name: '张三', age: 25, email: 'zhangsan@example.com' });

This triggers a change event, outputting something like:

{
  "_id": { "_data": "8262..." },
  "operationType": "insert",
  "fullDocument": {
    "_id": "5f8b...",
    "name": "张三",
    "age": 25,
    "email": "zhangsan@example.com",
    "__v": 0
  },
  "ns": { "db": "test", "coll": "users" },
  "documentKey": { "_id": "5f8b..." }
}

Types of Change Events

Change Streams can monitor various types of change events, including:

  1. insert: Triggered when a document is inserted.
  2. update: Triggered when a document is updated.
  3. delete: Triggered when a document is deleted.
  4. replace: Triggered when a document is replaced.
  5. invalidate: Triggered when the change stream becomes invalid due to certain reasons.

You can check the operationType property to determine the type of change:

changeStream.on('change', (change) => {
  switch(change.operationType) {
    case 'insert':
      console.log('New document inserted:', change.fullDocument);
      break;
    case 'update':
      console.log('Document updated:', change.documentKey, change.updateDescription);
      break;
    case 'delete':
      console.log('Document deleted:', change.documentKey);
      break;
    case 'replace':
      console.log('Document replaced:', change.fullDocument);
      break;
  }
});

Filtering Specific Changes

Sometimes you only care about changes that meet specific conditions. You can filter them by passing a pipeline parameter:

const pipeline = [
  { $match: { 'fullDocument.age': { $gte: 18 } } },
  { $match: { operationType: 'insert' } }
];

const changeStream = User.watch(pipeline);

This ensures the change event is only triggered when the inserted document's age field is 18 or older.

Change Details

For update operations, the change object includes an updateDescription field detailing which fields were modified:

User.updateOne({ name: '张三' }, { $set: { age: 26 } });

The corresponding change event will include:

{
  "updateDescription": {
    "updatedFields": {
      "age": 26
    },
    "removedFields": []
  }
}

Resuming Change Streams

Change Streams support resuming monitoring from a specific point in time, which is useful after an application restart:

// Save the last change token
let resumeToken;

changeStream.on('change', (change) => {
  resumeToken = change._id;
});

// After application restart
const newChangeStream = User.watch([], { resumeAfter: resumeToken });

Advanced Configuration Options

The watch() method accepts several configuration options:

const options = {
  fullDocument: 'updateLookup', // Returns the full document for update operations
  batchSize: 100,              // Maximum number of documents per batch
  maxAwaitTimeMS: 5000         // Maximum time to wait for new changes
};

const changeStream = User.watch([], options);

Practical Use Cases

  1. Real-time notification systems: Push notifications when users receive new messages.
  2. Data synchronization: Maintain consistency between different systems.
  3. Audit logging: Record all data changes for auditing purposes.
  4. Cache invalidation: Invalidate relevant caches when data changes.

Performance Considerations

While Change Streams are powerful, keep the following in mind:

  1. They add additional load to the database.
  2. High-frequency changes need to be handled properly.
  3. Network latency and reconnection mechanisms should be considered.
  4. Backpressure mechanisms may be needed to prevent consumer overload.

Error Handling

Always handle error events properly:

changeStream.on('error', (err) => {
  console.error('Change stream error:', err);
  // Implement reconnection logic
});

Integration with WebSocket Example

Here’s an example of integrating Change Streams with WebSocket:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const changeStream = User.watch();
  
  changeStream.on('change', (change) => {
    ws.send(JSON.stringify(change));
  });
  
  ws.on('close', () => {
    changeStream.close();
  });
});

Limitations and Notes

  1. Requires a MongoDB replica set or sharded cluster.
  2. Proper permission configuration is necessary.
  3. Long-running change streams may consume significant memory.
  4. Certain operations like collection deletion do not trigger change events.

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.