docs/server/features/connectors/sinks/mongo.md
The MongoDB sink pulls messages from a KurrentDB stream and stores them in a collection. The records will be serialized into BSON documents, so the data must be valid for BSON format.
Before using the MongoDB sink connector, ensure you have:
::: tip See the Data Protection documentation for instructions on configuring the encryption token. :::
You can create the MongoDB Sink connector as follows. Replace {id} with your desired connector ID:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "mongo-db-sink",
"connectionString": "mongodb://127.0.0.1:27020",
"database": "my_database",
"collection": "my_collection",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream"
}
}
After creating and starting the MongoDB sink connector, every time an event is
appended to the example-stream, the MongoDB sink connector will send the
record to the specified collection in the database. You can find a list of
available management API endpoints in the API Reference.
Adjust these settings to specify the behavior and interaction of your MongoDB sink connector with KurrentDB, ensuring it operates according to your requirements and preferences.
::: tip The MongoDB sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page. :::
The MongoDB sink can be configured with the following options:
| Name | Details |
|---|---|
database | required |
Description:
The name of the database where the records will be stored. |
| collection | required
Description:
The collection name that resides in the database to push records to. |
| connectionString | required
Description: The MongoDB URI to which the connector connects.
Default: "mongodb://mongoadmin:secret@localhost:27017/admin" |
| documentId:source | Description:
The attribute used to generate the document id.
Default: "recordId"
Accepted Values:
"recordId", "stream", "headers", or "streamSuffix" |
| documentId:expression | Description:
The expression used to format the document id based on the selected source. This allows for custom id generation logic.Default: "250" |
| certificate:rawData | Description:
Base64 encoded x509 certificate.
Default: "" |
| certificate:password | Description:
The password used to access the x509 certificate for secure connections.
Default: "" |
| batching:batchSize | Description:
Threshold batch size at which the sink will push the batch of records to the MongoDB collection.
Default: "1000" |
| batching:batchTimeoutMs | Description:
Threshold time in milliseconds at which the sink will push the current batch of records to the MongoDB collection, regardless of the batch size.
Default: "250" |
Resilience options can be found in the Resilience Configuration section.
The MongoDB sink connector automatically includes these default headers in each document sent to the collection. These
headers provide metadata about the event and are stored in a _metadata field
within the document.
This MongoDB sink connector currently only supports SCRAM and X.509 certificate authentication.
To use SCRAM for authentication, include the username and password in the
connection string and set the authMechanism parameter in the connection string
to either SCRAM-SHA-1 or SCRAM-SHA-256 to select the desired MongoDB
authentication mechanism. For more explanations on the connection string URI
refer to the official MongoDB documentation on Authentication Mechanism.
::: note MongoDB version 4.0 and later uses SCRAM-SHA-256 as the default authentication mechanism if the MongoDB server version supports it. :::
To use X.509 certificate authentication, include the base64 encoded x509 certificate and the password in the settings. You can use an online tool like base64encode to encode your certificate.
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"certificate:rawData": "base64encodedstring",
"certificate:password": "password"
}
The id of the document can be generated automatically based on the source specified and expression if needed. The following options are available:
By default, the MongoDB sink connector uses the record's Id as the document ID. This is the unique identifier generated for every record in KurrentDB.
Set Document ID using Stream ID
You can extract part of the stream name using a regular expression (regex) to
define the document id. The expression is optional and can be customized based
on your naming convention. In this example, the expression captures the stream
name up to _data.
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "stream",
"documentId:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a
hyphen), you can use the streamSuffix source. This
doesn't require an expression since it automatically extracts the suffix.
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "streamSuffix"
}
The streamSuffix source is useful when stream names follow a structured
format, and you want to use only the trailing part as the document ID. For
example, if the stream is named user-123, the document ID would be 123.
Set Document ID from Headers
You can create the document ID by combining values from a record's metadata.
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "headers",
"documentId:expression": "key1,key2"
}
Specify the header keys you want to use in the documentId:expression field (e.g., key1,key2). The connector will concatenate the header values with a hyphen (-) to create the partition key.
For example, if your event has headers key1: regionA and key2: zone1, the partition key will be regionA-zone1.
Learn how to set up and use a MongoDB Sink connector in KurrentDB through a tutorial.