docs/server/features/connectors/sinks/sql.md
The SQL sink connector writes events from KurrentDB to SQL databases by executing configurable SQL statements. You can define mappings between event types and SQL statement templates to control how events are persisted.
::: warning Content Type Requirement
This connector only processes events with the application/json content type. Events with other content types will fail processing.
:::
Before using the SQL sink connector, ensure you have:
::: tip See the Data Protection documentation for instructions on configuring the encryption token. :::
You can create the SQL Server sink connector as follows. Replace {id} with your desired connector ID:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "sql-sink",
"type": "SqlServer",
"connectionString": "Server=127.0.0.1,1433;Database=master;User Id=sa;Password=YourPassword123;TrustServerCertificate=True",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "vehicles",
"reducer:mappings": "<base64-encoded-mappings-json>"
}
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "sql-sink",
"type": "PostgreSql",
"connectionString": "Host=127.0.0.1;Port=54322;Database=postgres;Username=postgres;Password=postgres",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "vehicles",
"reducer:mappings": "<base64-encoded-mappings-json>"
}
Define SQL statement templates with parameter placeholders (prefixed with @) and JavaScript extractor functions that extract values from records.
Example configuration:
{
"VehicleRegistered": {
"Statement": "INSERT INTO vehicle_registrations (id, vin) VALUES (@id, @vin)",
"Extractor": "(record) => ({ id: record.value.registrationId, vin: record.value.vin })"
},
"VehicleTransferred": {
"Statement": "UPDATE vehicle_registrations SET owner_name = @newOwnerName WHERE vin = @vin",
"Extractor": "(record) => ({ vin: record.value.vin, newOwnerName: record.value.newOwnerName })"
}
}
The configuration is passed as a base64-encoded JSON string using the reducer:mappings option.
::: note
The extractor function receives a record parameter following the KurrentDB record structure. It must return an object with properties matching the parameter placeholders in the SQL statement (without the @ prefix).
:::
Use helper functions to ensure values are correctly typed for your database columns:
| Function | Use For | Example |
|---|---|---|
Guid(string) | UUID/UNIQUEIDENTIFIER columns | Guid(record.value.id) |
DateTime(string) | TIMESTAMP/DATETIME columns | DateTime(record.value.createdAt) |
TimeSpan(string) | TIME/INTERVAL columns | TimeSpan(record.value.duration) |
Example:
{
"OrderPlaced": {
"Statement": "INSERT INTO orders (id, placed_at, customer_name) VALUES (@id, @placedAt, @customerName)",
"Extractor": "(record) => ({ id: Guid(record.value.orderId), placedAt: DateTime(record.value.timestamp), customerName: record.value.customer.name })"
}
}
Adjust these settings to specify the behavior and interaction of your SQL sink connector with KurrentDB, ensuring it operates according to your requirements and preferences.
::: tip The SQL sink connector inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page. :::
The SQL sink connector can be configured with the following options:
| Name | Details |
|---|---|
type | Type: Enum |
Description: Type of SQL database to connect to.
Accepted Values: SqlServer, PostgreSql
Default: SqlServer |
| connectionString | required
Description:
Connection string to the SQL database. Authentication options will override credentials in the connection string. |
| reducer:mappings | Description:
Base64-encoded JSON object mapping schema names to SQL statement templates and parameter extractors. |
| Name | Details |
|---|---|
authentication:username | Description: Username for database authentication. |
authentication:password | Description: Password for database authentication. |
This will overwrite any username/password specified in the connection string.
| Name | Details |
|---|---|
authentication:clientCertificate | Description: Base64 encoded client certificate for mutual TLS. |
authentication:certificatePassword | Description: Password for the client certificate. |
| Name | Details |
|---|---|
resilience:enabled | Type: boolean |
Description: Enables resilience features for database operations.
Default: true |
| resilience:commandTimeout | Type: int
Description: Command timeout in seconds.
Default: 30 |
| resilience:connectionTimeout | Type: int
Description: Connection timeout in seconds for establishing the initial database connection.
Default: 15 |
| resilience:minPoolSize | Type: int
Description: Minimum number of connections in the pool.
Default: 0 |
| resilience:maxPoolSize | Type: int
Description: Maximum number of connections in the pool.
Default: 100 |
| resilience:connectionLifetime | Type: int
Description: Maximum lifetime of a connection in seconds. When a connection is returned to the pool, it is destroyed if its lifetime exceeds this value.
Default: 0 (no limit) |
| resilience:firstDelayBound:upperLimitMs | Type: double
Description: Upper time limit in milliseconds for the first backoff delay bound.
Default: 60000 (1 minute) |
| resilience:firstDelayBound:delayMs | Type: double
Description: Delay in milliseconds for the first backoff bound.
Default: 5000 (5 seconds) |
| resilience:secondDelayBound:upperLimitMs | Type: double
Description: Upper time limit in milliseconds for the second backoff delay bound.
Default: 3600000 (1 hour) |
| resilience:secondDelayBound:delayMs | Type: double
Description: Delay in milliseconds for the second backoff bound.
Default: 600000 (10 minutes) |
| resilience:thirdDelayBound:upperLimitMs | Type: double
Description: Upper time limit in milliseconds for the third backoff delay bound. -1 means unlimited.
Default: -1 |
| resilience:thirdDelayBound:delayMs | Type: double
Description: Delay in milliseconds for the third backoff bound.
Default: 3600000 (1 hour) |
The settings commandTimeout, connectionTimeout, minPoolSize, maxPoolSize, and connectionLifetime will override any corresponding values specified in the connection string.
This example demonstrates how to insert vehicle registration records into a local PostgreSQL instance on Supabase using the SQL sink connector.
Run the following command to get your local Supabase connection details:
supabase status
Expected output:
Database URL: postgresql://postgres:[email protected]:54322/postgres
Studio URL: http://127.0.0.1:54323
...
Note down the Studio URL because you'll need it to verify the data insertion in step 6.
The SQL connector requires the connection string in ADO.NET format:
Host=<host>;Port=<port>;Database=<database>;Username=<username>;Password=<password>
Example using the output above:
Host=127.0.0.1;Port=54322;Database=postgres;Username=postgres;Password=postgres
http://127.0.0.1:54323CREATE TABLE vehicle_registrations (
id UUID PRIMARY KEY,
vin VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Define SQL statement templates with parameter extractors for each schema name. Use helper functions like Guid() to convert string values to their proper types:
{
"VehicleRegistered": {
"Statement": "INSERT INTO vehicle_registrations (id, vin) VALUES (@id, @vin)",
"Extractor": "(record) => ({ id: Guid(record.value.registrationId), vin: record.value.vin })"
}
}
::: tip
The Guid() helper function converts the string registrationId to a proper System.Guid type, which maps to PostgreSQL's UUID type.
:::
PowerShell:
$mappings = @'
{
"VehicleRegistered": {
"Statement": "INSERT INTO vehicle_registrations (id, vin) VALUES (@id, @vin)",
"Extractor": "(record) => ({ id: Guid(record.value.registrationId), vin: record.value.vin })"
}
}
'@
$encoded = [Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes($mappings))
Write-Output $encoded
Bash/Linux:
echo -n '{
"VehicleRegistered": {
"Statement": "INSERT INTO vehicle_registrations (id, vin) VALUES (@id, @vin)",
"Extractor": "(record) => ({ id: Guid(record.value.registrationId), vin: record.value.vin })"
}
}' | base64 -w 0
Expected output:
ewogICJWZWhpY2xlUmVnaXN0ZXJlZCI6IHsKICAgICJTdGF0ZW1lbnQiOiAiSU5TRVJUIElOVE8gdmVoaWNsZV9yZWdpc3RyYXRpb25zIChpZCwgdmluKSBWQUxVRVMgKEBpZCwgQHZpbikiLAogICAgIkV4dHJhY3RvciI6ICIocmVjb3JkKSA9PiAoeyBpZDogcmVjb3JkLnZhbHVlLnJlZ2lzdHJhdGlvbklkLCB2aW46IHJlY29yZC52YWx1ZS52aW4gfSkiCiAgfQp9
Create the connector:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "sql-sink",
"type": "PostgreSql",
"connectionString": "Host=127.0.0.1;Port=54322;Database=postgres;Username=postgres;Password=postgres",
"reducer:mappings": "<base64-encoded-mappings-from-step-5>",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "vehicles"
}
}
Start the connector:
POST /connectors/{id}/start
Host: localhost:2113
::: tip
The connector will now listen for records on the vehicles stream and insert them into the database.
:::
Append a test event to the vehicles stream:
POST /streams/vehicles
Host: localhost:2113
Content-Type: application/vnd.eventstore.events+json
[
{
"eventId": "{{$guid}}",
"eventType": "VehicleRegistered",
"data": {
"registrationId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"vin": "1HGBH41JXMN109186"
}
}
]
http://127.0.0.1:54323| id | vin | created_at |
|---|---|---|
| a1b2c3d4-e5f6-7890-abcd-ef1234567890 | 1HGBH41JXMN109186 | 2025-11-26 14:23:15 |