Kafka Connector and Data Integration
Kafka Connector and MongoDB Data Integration Basics
Kafka Connector is a component in the Apache Kafka ecosystem used to connect external data systems. It is divided into two types: Source Connector (data source) and Sink Connector (data destination). In MongoDB data integration scenarios, Kafka Connector enables bidirectional data flow between MongoDB and Kafka.
MongoDB, as a document-oriented database, combines its flexible data model with Kafka's high-throughput messaging system to build powerful real-time data pipelines. Typical application scenarios include:
- Real-time synchronization of MongoDB change streams to Kafka topics
- Writing JSON messages from Kafka into MongoDB collections
- Decoupling data between microservices
MongoDB as a Kafka Data Source
Using Kafka Connect's MongoDB Source Connector, data can be read from MongoDB and written to Kafka topics. Key parameters to specify in the configuration include:
{
"name": "mongo-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://user:password@host:27017",
"database": "inventory",
"collection": "products",
"pipeline": "[{\"$match\": {\"operationType\": \"insert\"}}]",
"topic.prefix": "mongo."
}
}
This configuration monitors insert operations in the inventory.products
collection and publishes change events to the mongo.inventory.products
topic. The Connector supports full change stream pipeline syntax, enabling complex event filtering and transformation.
Writing Kafka Data to MongoDB
The MongoDB Sink Connector writes data from Kafka topics into MongoDB collections. Here is a typical configuration example:
{
"name": "mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://user:password@host:27017",
"database": "analytics",
"collection": "user_events",
"topics": "user.tracking.events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "_id",
"document.id.strategy.partial.value.projection.type": "allowlist"
}
}
This configuration writes JSON messages from the user.tracking.events
topic into the analytics.user_events
collection, using the _id
field from the message as the document primary key.
Change Data Capture (CDC) Implementation
MongoDB's Change Stream feature combined with Kafka Connect enables efficient CDC solutions:
// Change stream pipeline example
[
{
"$match": {
"$or": [
{ "operationType": "insert" },
{ "operationType": "update" },
{ "operationType": "replace" }
]
}
},
{
"$project": {
"_id": 1,
"fullDocument": 1,
"ns": 1,
"documentKey": 1,
"operationType": 1,
"updateDescription": 1
}
}
]
This pipeline captures insert, update, and replace operations and extracts key information from the changed documents. The Connector converts these change events into Kafka messages, which typically include:
- Operation type (insert/update/delete)
- Document ID
- Full document or changed fields
- Timestamp
Data Transformation and Processing
Kafka Connect provides Single Message Transform (SMT) functionality to transform data before it enters Kafka or is written to MongoDB:
{
"transforms": "unwrap,formatTS",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.target.type": "string",
"transforms.formatTS.field": "timestamp",
"transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss"
}
This transformation configuration:
- Unpacks Debezium-formatted change events
- Converts timestamp fields into strings with the specified format
Error Handling and Retry Mechanisms
In production environments, robust error handling strategies are essential:
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: true
errors.deadletterqueue.topic.name: mongo_dlq
errors.deadletterqueue.context.headers.enable: true
retry.backoff.ms: 1000
max.retries: 5
These parameters enable the Connector to:
- Tolerate all errors without stopping the task
- Log error details
- Send failed messages to a dead-letter queue
- Automatically retry failed operations
Performance Optimization Configuration
Performance tuning recommendations for high-traffic scenarios:
tasks.max=4
batch.size=1000
max.num.producer.requests=50
linger.ms=100
buffer.memory=33554432
Corresponding MongoDB client optimizations:
const client = new MongoClient(uri, {
poolSize: 10,
w: "majority",
wtimeout: 5000,
readConcern: { level: "local" },
readPreference: "secondaryPreferred"
});
Monitoring and Operations
An effective monitoring solution should include:
- Kafka Connect Worker metrics:
curl -s http://connect-host:8083/metrics | grep connector_
- MongoDB change stream latency monitoring:
db.getCollection('oplog.rs').find().sort({ $natural: -1 }).limit(1)
- Custom metric collection:
from prometheus_client import start_http_server, Gauge
mongo_lag = Gauge('mongo_connector_lag', 'Consumer group lag in messages')
start_http_server(8000)
Security Configuration Practices
Production environment security configuration example:
{
"connection.uri": "mongodb+srv://user:password@cluster0.example.com/?tls=true&authSource=admin",
"ssl.enabled": "true",
"ssl.truststore.location": "/path/to/truststore.jks",
"ssl.truststore.password": "changeit",
"producer.override.security.protocol": "SSL",
"producer.override.ssl.endpoint.identification.algorithm": ""
}
Corresponding MongoDB Atlas connection configuration:
{
connectionString: "mongodb+srv://<user>:<password>@cluster0.example.com/test?retryWrites=true&w=majority",
tls: true,
tlsAllowInvalidCertificates: false,
authMechanism: "SCRAM-SHA-256"
}
Practical Application Case
E-commerce order processing pipeline example:
- Order service writes orders to MongoDB
- Source Connector captures order changes
- Order events enter the Kafka
orders
topic - Multiple consumers process events:
// Payment service consumer
kafka.consume('orders', (message) => {
const order = JSON.parse(message.value);
if (order.status === 'CREATED') {
processPayment(order);
}
});
// Inventory service consumer
kafka.consume('orders', (message) => {
const order = JSON.parse(message.value);
if (order.status === 'PAID') {
updateInventory(order.items);
}
});
- Sink Connector writes processing results back to MongoDB:
{
"collection": "order_audit",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy"
}
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn