docs/en/engines/table-engines/integrations/nats.md
This engine allows integrating ClickHouse with NATS.
NATS lets you:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = NATS SETTINGS
nats_url = 'host:port',
nats_subjects = 'subject1,subject2,...',
nats_format = 'data_format'[,]
[nats_schema = '',]
[nats_num_consumers = N,]
[nats_queue_group = 'group_name',]
[nats_secure = false,]
[nats_max_reconnect = N,]
[nats_reconnect_wait = N,]
[nats_server_list = 'host1:port1,host2:port2,...',]
[nats_skip_broken_messages = N,]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_credential_file = '/var/nats_credentials',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1,]
[nats_handle_error_mode = 'default']
Required parameters:
nats_url – host:port (for example, localhost:5672)..nats_subjects – List of subject for NATS table to subscribe/publish to. Supports wildcard subjects like foo.*.bar or baz.>nats_format – Message format. Uses the same notation as the SQL FORMAT function, such as JSONEachRow. For more information, see the Formats section.Optional parameters:
nats_schema – Parameter that must be used if the format requires a schema definition. For example, Cap'n Proto requires the path to the schema file and the name of the root schema.capnp:Message object.nats_stream – The name of an existing stream in NATS JetStream.nats_consumer – The name of an existing durable pull consumer in NATS JetStream.nats_num_consumers – The number of consumers per table. Default: 1. Specify more consumers if the throughput of one consumer is insufficient for NATS core only.nats_queue_group – Name for queue group of NATS subscribers. Default is the table name.nats_max_reconnect – Deprecated and has no effect, reconnect is performed permanently with nats_reconnect_wait timeout.nats_reconnect_wait – Amount of time in milliseconds to sleep between each reconnect attempt. Default: 5000.nats_server_list - Server list for connection. Can be specified to connect to NATS cluster.nats_skip_broken_messages - NATS message parser tolerance to schema-incompatible messages per block. Default: 0. If nats_skip_broken_messages = N then the engine skips N NATS messages that cannot be parsed (a message equals a row of data).nats_max_block_size - Number of row collected by poll(s) for flushing data from NATS. Default: max_insert_block_size.nats_flush_interval_ms - Timeout for flushing data read from NATS. Default: stream_flush_interval_ms.nats_username - NATS username.nats_password - NATS password.nats_token - NATS auth token.nats_credential_file - Path to a NATS credentials file.nats_startup_connect_tries - Number of connect tries at startup. Default: 5.nats_max_rows_per_message — The maximum number of rows written in one NATS message for row-based formats. (default : 1).nats_handle_error_mode — How to handle errors for NATS engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns _error and _raw_message).SSL connection:
For secure connection use nats_secure = 1.
Certificate verification is controlled by the CLICKHOUSE_NATS_TLS_SECURE environment variable;
If the certificate is expired, self-signed, missing, or otherwise invalid, disable verification by setting CLICKHOUSE_NATS_TLS_SECURE=0.
Writing to NATS table:
If table reads only from one subject, any insert will publish to the same subject.
However, if table reads from multiple subjects, we need to specify which subject we want to publish to.
That is why whenever inserting into table with multiple subjects, setting stream_like_engine_insert_queue is needed.
You can select one of the subjects the table reads from and publish your data there. For example:
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1,subject2',
nats_format = 'JSONEachRow';
INSERT INTO queue
SETTINGS stream_like_engine_insert_queue = 'subject2'
VALUES (1, 1);
Also format settings can be added along with nats-related settings.
Example:
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
The NATS server configuration can be added using the ClickHouse config file. More specifically you can add your password for the NATS engine:
<nats>
<user>click</user>
<password>house</password>
<token>clickhouse</token>
</nats>
SELECT is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using materialized views. To do this:
When the MATERIALIZED VIEW joins the engine, it starts collecting data in the background. This allows you to continually receive messages from NATS and convert them to the required format using SELECT.
One NATS table can have as many materialized views as you like, they do not read data from the table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).
Example:
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
To stop receiving streams data or to change the conversion logic, detach the materialized view:
DETACH TABLE consumer;
ATTACH TABLE consumer;
If you want to change the target table by using ALTER, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.
_subject - NATS message subject. Data type: String.Additional virtual columns when nats_handle_error_mode='stream':
_raw_message - Raw message that couldn't be parsed successfully. Data type: Nullable(String)._error - Exception message happened during failed parsing. Data type: Nullable(String).Note: _raw_message and _error virtual columns are filled only in case of exception during parsing, they are always NULL when message was parsed successfully.
NATS engine supports all formats supported in ClickHouse. The number of rows in one NATS message depends on whether the format is row-based or block-based:
nats_max_rows_per_message.Before using NATS engine with NATS JetStream, you must create a NATS stream and a durable pull consumer. For this, you can use, for example, the nats utility from the NATS CLI package:
<details> <summary>creating stream</summary>$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created
Information for Stream stream_name created 2025-10-03 14:12:51
Subjects: stream_subject
Replicas: 1
Storage: File
Options:
Retention: Limits
Acknowledgments: true
Discard Policy: Old
Duplicate Window: 2m0s
Direct Get: true
Allows Msg Delete: true
Allows Purge: true
Allows Per-Message TTL: false
Allows Rollups: false
Limits:
Maximum Messages: unlimited
Maximum Per Subject: unlimited
Maximum Bytes: unlimited
Maximum Age: unlimited
Maximum Message Size: unlimited
Maximum Consumers: unlimited
State:
Messages: 0
Bytes: 0 B
First Sequence: 0
Last Sequence: 0
Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all)
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00
Configuration:
Name: consumer_name
Pull Mode: true
Deliver Policy: All
Ack Policy: Explicit
Ack Wait: 30.00s
Replay Policy: Instant
Max Ack Pending: 1,000
Max Waiting Pulls: 512
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
Outstanding Acks: 0 out of maximum 1,000
Redelivered Messages: 0
Unprocessed Messages: 0
Waiting Pulls: 0 of maximum 512
After creating stream and durable pull consumer, we can create a table with NATS engine. To do this, you need to initialize: nats_stream, nats_consumer_name, and nats_subjects:
CREATE TABLE nats_jet_stream (
key UInt64,
value UInt64
) ENGINE NATS
SETTINGS nats_url = 'localhost:4222',
nats_stream = 'stream_name',
nats_consumer_name = 'consumer_name',
nats_subjects = 'stream_subject',
nats_format = 'JSONEachRow';