doc/user/content/ingest-data/mongodb/_index.md
Using Change Data Capture (CDC), you can track and propagate changes from
MongoDB to downstream consumers. This guide outlines how to ingest data from a
MongoDB replica set (rs0) into Materialize; specifically from the collections
items and orders in the test database.
flowchart LR
subgraph "Source"
Mongo[("MongoDB (Replica Set/Sharded Cluster)")]
end
subgraph "Streaming Ingestion: Debezium + Kafka"
Debezium["Debezium Connector"]
Kafka{{"Kafka Broker"}}
Schema["Schema Registry"]
end
subgraph "Materialize"
MZ[("Materialize")]
end
Mongo -- "Change stream oplog data" --> Debezium
Debezium -- "Writes Avro Events" --> Kafka
Schema -. "Validates Schemas" .- Kafka
Kafka -- "Consumes Topic" --> MZ
Schema -. "Decodes Data" .- MZ
The architecture consists of the following components:
Source Component: MongoDB
Streaming Ingestion Component: Debezium + Kafka (with Schema Registry)
Materialize: Materialize uses connects to Kafka to ingests data from MongoDB.
MongoDB: Version 6.0 or later. Must be deployed as a replica set or a sharded cluster.
Kafka Cluster: A running Kafka broker and Schema Registry (e.g., Confluent Platform or Redpanda).
Debezium Connect: Version 2.5.4+ or later. A Kafka Connect cluster with the MongoDB connector plugin installed.
{{< important >}} For Debezium to capture changes, your MongoDB deployment must be configured as a replica set or a sharded cluster. {{< /important >}}
Create a user (e.g., debezium_materialize_user) with the necessary permissions
to read from the database. Depending on the connector's
capture.scope
property, the user may need read on a specific database or all databases.
For this guide, the example will ingest items and orders collections in the
test database. As such, the example uses capture.scope of database to
read from the test database only:
db.getSiblingDB("test").createUser({
user: "debezium_materialize_user",
pwd: passwordPrompt(), // Or "<cleartext password>"
roles: [
{ role: "read", db: "test" }
]
});
For simplicity, the user is created in the test database; however, the user
can be created in a different database.
The streaming layer requires Kafka, Schema Registry, and Kafka Connect with the Debezium MongoDB connector. If you have not set up these services, see:
Confluent Platform Quickstart(Recommended for Testing): Docker-based setup for Kafka, Schema Registry, and Kafka Connect.
Debezium Tutorial: Example using Docker Compose.
For this tutorial, ensure your Debezium container allows outbound traffic to your external MongoDB host.
Configure your Kafka Connect worker to use Avro serialization for Schema Registry integration; specifically, set the following environment variables/properties are set on your Connect worker:
| Property | Value |
|---|---|
KEY_CONVERTER | io.confluent.connect.avro.AvroConverter |
VALUE_CONVERTER | io.confluent.connect.avro.AvroConverter |
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL | http://schema-registry:8081 |
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL | http://schema-registry:8081 |
Register the MongoDB connector with the following configuration:
| Parameter | Description |
|---|---|
name | A unique name for the connector instance. |
connector.class | The Debezium MongoDB connector class, "io.debezium.connector.mongodb.MongoDbConnector". |
mongodb.connection.string | Your MongoDB connection string with the debezium_materialize_user. |
topic.prefix | A unique prefix for Kafka topics. Topics are created as <prefix>.<db>.<collection>. Recommended Once set, do not change the value of this property. See the official documentation for details. |
collection.include.list | Comma-separated list of collections to capture in the format <db>.<collection>, e.g., "test.orders,test.items". |
capture.mode | <red>Must</red> be "change_streams_update_full" (the default). This captures full document state on updates, which is required for Materialize's UPSERT envelope. |
capture.scope | The scope of the change stream: "database" or "deployment". This value affects the required MongoDB user permissions. |
capture.target | The database to monitor for changes. Required only when capture.scope is "database". |
transforms | Optional. Set to "unwrap" to extract the document state from Debezium's change event envelope. |
transforms.unwrap.type | Optional. Set to "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" to use the MongoDB-specific unwrap transform. |
{{< important >}}
The capture.mode must be change_streams_update_full, the default. This
forces Debezium to send the entire document state for every change, which allows
Materialize to use the UPSERT envelope.
{{< /important >}}
Create a dbz_mongodb_connector.json file with your connector configuration:
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://debezium_materialize_user:<associated_pwd>@host1:27017,host2:27017,host3:27017/?replicaSet=rs0&authSource=test",
"topic.prefix": "mdb-prod-rs0",
"collection.include.list": "test.orders,test.items",
"capture.mode": "change_streams_update_full",
"capture.scope": "database",
"capture.target": "test",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"
}
}
Required updates:
Update mongodb.connection.string with your MongoDB connection
string
for the debezium_materialize_user. For example:
"mongodb+srv://debezium_materialize_user:<associated_pwd>@host[/[defaultauthdb][?options]]""mongodb://debezium_materialize_user:<associated_pwd>@host1[:port1][,...hostN[:portN]]/?replicaSet=<replica_set_name>&authSource=test"Update collection.include.list with the collections you want to capture.
Update topic.prefix with unique prefix for your environment.
Optional modifications:
capture.scope: To monitor changes across all databases, you can change
to "deployment" to monitor all databases (requires
broader MongoDB permissions for your debezium user).capture.target: Only needed if capture.scope is "database".transforms and transforms.unwrap.type: You can omit or customize.{{< tip >}}
When using the unwrap transform of type ExtractNewDocumentState with
MongoDB and Avro serialization, the Avro schema is inferred from the first
document processed. Because MongoDB allows the same field to have
different BSON types across documents, ensure that same field uses a
consistent BSON type across the collection to avoid schema inference
errors.
For example, if the first document has price: 40, the registered Avro
schema infers the price field type to be int. If a subsequent document
has price: 2.25 (a decimal value), it will cause a schema mismatch error.
To avoid this, explicitly specify the BSON type in your documents using
NumberDecimal(), e.g., price: NumberDecimal("40.00") and price: NumberDecimal("2.25").
If you cannot enforce a consistent BSON type, you can omit the unwrap. {{< /tip >}}
Register the connector with Kafka Connect:
curl -X POST -H "Content-Type:application/json" \
http://<your-host>:8083/connectors \
-d @dbz_mongodb_connector.json
Replace <your-host> with your Kafka Connect hostname or IP address.
In Materialize, Create the Kafka connection and the Confluent Schema Registry connection
Create the Kafka connection. For example, if using SASL_PLAINTEXT:
CREATE SECRET IF NOT EXISTS kafka_secret AS '<kafka-password>';
CREATE CONNECTION IF NOT EXISTS kafka_connection TO KAFKA (
BROKER '<kafka-bootstrap-server>:9092',
SECURITY PROTOCOL = 'SASL_PLAINTEXT',
SASL MECHANISMS = 'SCRAM-SHA-256',
SASL USERNAME = '<kafka-SASL-username>',
SASL PASSWORD = SECRET kafka_secret
);
Create the Confluent Schema Registry connection:
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL 'http://schema-registry:8081'
);
Create the sources for the specific Kafka topic
(<topic.prefix>.<database>.<collection>).
CREATE SOURCE mdb_items
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
CREATE SOURCE mdb_orders
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
Query the data using standard SQL.
To query the mdb_items:
SELECT * FROM mdb_items;
{{< note >}}
The query includes the Kafka message id field as a column in the table.
{{< /note >}}
id | _id | item_id | item | price | currency
--------------------------+--------------------------+---------+-----------------------+-------+----------
693c608a8de2a41c2ba365f3 | 693c608a8de2a41c2ba365f3 | 8 | donut | 1.25 | USD
693c608a8de2a41c2ba365ec | 693c608a8de2a41c2ba365ec | 1 | brownie | 2.25 | USD
693c608a8de2a41c2ba365f2 | 693c608a8de2a41c2ba365f2 | 7 | cupcake | 3.00 | USD
693c608a8de2a41c2ba365f4 | 693c608a8de2a41c2ba365f4 | 9 | egg tart | 2.50 | USD
{{< tip >}}
If you did not use the unwrap transform, the document is stored as a JSON
string in the after field in mdb_items. You can create a parsing
view to map the individual document fields to columns instead.
{{< /tip >}}
To query the mdb_orders:
SELECT * FROM mdb_orders;
{{< note >}}
The query includes the Kafka message id field as a column in the table.
{{< /note >}}
id | _id | order_id | order_date | item | quantity | status
--------------------------+--------------------------+----------+-------------------------+-----------------------+----------+------------
693c608a8de2a41c2ba365d4 | 693c608a8de2a41c2ba365d4 | 3 | 2025-12-12 18:05:54.648 | donut | 36 | Pending
693c608a8de2a41c2ba365e3 | 693c608a8de2a41c2ba365e3 | 8 | 2025-12-10 18:50:54.648 | donut | 12 | Shipped
693c608a8de2a41c2ba365cc | 693c608a8de2a41c2ba365cc | 1 | 2025-12-12 18:35:54.648 | brownie | 10 | Pending
693c608a8de2a41c2ba365d2 | 693c608a8de2a41c2ba365d2 | 2 | 2025-12-12 18:20:54.648 | brownie | 20 | Pending
{{< tip >}}
If you did not use the unwrap transform, the document is stored as a JSON
string in the after field in mdb_orders. You can create a parsing
view to map the individual document fields to
columns instead.
{{< /tip >}}