阿里云主机折上折
  • 微信号
Current Site:Index > Kafka Connector and Data Integration

Kafka Connector and Data Integration

Author:Chuan Chen 阅读数:33868人阅读 分类: MongoDB

Kafka Connector and MongoDB Data Integration Basics

Kafka Connector is a component in the Apache Kafka ecosystem used to connect external data systems. It is divided into two types: Source Connector (data source) and Sink Connector (data destination). In MongoDB data integration scenarios, Kafka Connector enables bidirectional data flow between MongoDB and Kafka.

MongoDB, as a document-oriented database, combines its flexible data model with Kafka's high-throughput messaging system to build powerful real-time data pipelines. Typical application scenarios include:

  • Real-time synchronization of MongoDB change streams to Kafka topics
  • Writing JSON messages from Kafka into MongoDB collections
  • Decoupling data between microservices

MongoDB as a Kafka Data Source

Using Kafka Connect's MongoDB Source Connector, data can be read from MongoDB and written to Kafka topics. Key parameters to specify in the configuration include:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://user:password@host:27017",
    "database": "inventory",
    "collection": "products",
    "pipeline": "[{\"$match\": {\"operationType\": \"insert\"}}]",
    "topic.prefix": "mongo."
  }
}

This configuration monitors insert operations in the inventory.products collection and publishes change events to the mongo.inventory.products topic. The Connector supports full change stream pipeline syntax, enabling complex event filtering and transformation.

Writing Kafka Data to MongoDB

The MongoDB Sink Connector writes data from Kafka topics into MongoDB collections. Here is a typical configuration example:

{
  "name": "mongo-sink-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "connection.uri": "mongodb://user:password@host:27017",
    "database": "analytics",
    "collection": "user_events",
    "topics": "user.tracking.events",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "document.id.strategy.partial.value.projection.list": "_id",
    "document.id.strategy.partial.value.projection.type": "allowlist"
  }
}

This configuration writes JSON messages from the user.tracking.events topic into the analytics.user_events collection, using the _id field from the message as the document primary key.

Change Data Capture (CDC) Implementation

MongoDB's Change Stream feature combined with Kafka Connect enables efficient CDC solutions:

// Change stream pipeline example
[
  {
    "$match": {
      "$or": [
        { "operationType": "insert" },
        { "operationType": "update" },
        { "operationType": "replace" }
      ]
    }
  },
  {
    "$project": {
      "_id": 1,
      "fullDocument": 1,
      "ns": 1,
      "documentKey": 1,
      "operationType": 1,
      "updateDescription": 1
    }
  }
]

This pipeline captures insert, update, and replace operations and extracts key information from the changed documents. The Connector converts these change events into Kafka messages, which typically include:

  • Operation type (insert/update/delete)
  • Document ID
  • Full document or changed fields
  • Timestamp

Data Transformation and Processing

Kafka Connect provides Single Message Transform (SMT) functionality to transform data before it enters Kafka or is written to MongoDB:

{
  "transforms": "unwrap,formatTS",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.formatTS.target.type": "string",
  "transforms.formatTS.field": "timestamp",
  "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss"
}

This transformation configuration:

  1. Unpacks Debezium-formatted change events
  2. Converts timestamp fields into strings with the specified format

Error Handling and Retry Mechanisms

In production environments, robust error handling strategies are essential:

errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: true
errors.deadletterqueue.topic.name: mongo_dlq
errors.deadletterqueue.context.headers.enable: true
retry.backoff.ms: 1000
max.retries: 5

These parameters enable the Connector to:

  • Tolerate all errors without stopping the task
  • Log error details
  • Send failed messages to a dead-letter queue
  • Automatically retry failed operations

Performance Optimization Configuration

Performance tuning recommendations for high-traffic scenarios:

tasks.max=4
batch.size=1000
max.num.producer.requests=50
linger.ms=100
buffer.memory=33554432

Corresponding MongoDB client optimizations:

const client = new MongoClient(uri, {
  poolSize: 10,
  w: "majority",
  wtimeout: 5000,
  readConcern: { level: "local" },
  readPreference: "secondaryPreferred"
});

Monitoring and Operations

An effective monitoring solution should include:

  1. Kafka Connect Worker metrics:
curl -s http://connect-host:8083/metrics | grep connector_
  1. MongoDB change stream latency monitoring:
db.getCollection('oplog.rs').find().sort({ $natural: -1 }).limit(1)
  1. Custom metric collection:
from prometheus_client import start_http_server, Gauge

mongo_lag = Gauge('mongo_connector_lag', 'Consumer group lag in messages')
start_http_server(8000)

Security Configuration Practices

Production environment security configuration example:

{
  "connection.uri": "mongodb+srv://user:password@cluster0.example.com/?tls=true&authSource=admin",
  "ssl.enabled": "true",
  "ssl.truststore.location": "/path/to/truststore.jks",
  "ssl.truststore.password": "changeit",
  "producer.override.security.protocol": "SSL",
  "producer.override.ssl.endpoint.identification.algorithm": ""
}

Corresponding MongoDB Atlas connection configuration:

{
  connectionString: "mongodb+srv://<user>:<password>@cluster0.example.com/test?retryWrites=true&w=majority",
  tls: true,
  tlsAllowInvalidCertificates: false,
  authMechanism: "SCRAM-SHA-256"
}

Practical Application Case

E-commerce order processing pipeline example:

  1. Order service writes orders to MongoDB
  2. Source Connector captures order changes
  3. Order events enter the Kafka orders topic
  4. Multiple consumers process events:
// Payment service consumer
kafka.consume('orders', (message) => {
  const order = JSON.parse(message.value);
  if (order.status === 'CREATED') {
    processPayment(order);
  }
});

// Inventory service consumer
kafka.consume('orders', (message) => {
  const order = JSON.parse(message.value);
  if (order.status === 'PAID') {
    updateInventory(order.items);
  }
});
  1. Sink Connector writes processing results back to MongoDB:
{
  "collection": "order_audit",
  "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
  "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy"
}

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.