docs/en/06-advanced/03-stream.md
In time-series data processing, there are many common stream processing requirements, such as:
In traditional time-series solutions, Kafka, Flink, and other stream processing systems are often deployed. However, the complexity of these systems brings high development and operations costs. The stream processing engine in TDengine TSDB provides the capability to process incoming data streams in real time. Using SQL, users can define real-time transformations. Once data is written into the source table of a stream, it is automatically processed as defined, and results are pushed to target tables according to the trigger mode. This offers a lightweight alternative to complex stream processing systems, while still delivering millisecond-level result latency even under high-throughput data ingestion. Unlike traditional stream processing, TDengine TSDB adopts a trigger–compute decoupling strategy, still operating on continuous unbounded data streams, but with the following enhancements:
TDengine TSDB’s stream processing engine also offers additional usability benefits. For varying requirements on result latency, it allows users to balance between result timeliness and resource load. For different needs in out-of-order write scenarios, it enables users to flexibly choose appropriate handling methods and strategies. This offers a lightweight alternative to complex stream processing systems, while still delivering millisecond-level result latency even under high-throughput data ingestion.
For detailed usage instructions, see SQL Manual.
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
}
trigger_type: {
PERIOD(period_time[, offset_time])
| SLIDING(sliding_val[, offset_time])
| INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time])
| SESSION(ts_col, session_val)
| STATE_WINDOW(col [, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)]
| EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)]
| COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
}
true_for_expr: {
duration_time
| COUNT count_val
| duration_time AND COUNT count_val
| duration_time OR COUNT count_val
}
stream_option: {WATERMARK(duration_time) | EXPIRED_TIME(exp_time) | IGNORE_DISORDER | DELETE_RECALC | DELETE_OUTPUT_TABLE | FILL_HISTORY[(start_time)] | FILL_HISTORY_FIRST[(start_time)] | CALC_NOTIFY_ONLY | LOW_LATENCY_CALC | PRE_FILTER(expr) | FORCE_OUTPUT | MAX_DELAY(delay_time) | EVENT_TYPE(event_types)}
notification_definition:
NOTIFY(url [, ...]) [ON (event_types)] [WHERE condition] [NOTIFY_OPTIONS(notify_option[|notify_option])]
notify_option: [NOTIFY_HISTORY | ON_FAILURE_PAUSE]
event_types:
event_type [|event_type]
event_type: {WINDOW_OPEN | WINDOW_CLOSE}
tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
After a trigger occurs, different actions can be performed as needed, for example, sending an event notification, executing a computation, or both.
In general, one stream computing task corresponds to a single computation — for example, triggering a computation based on one subtable and storing the result in one output table. Following TDengine’s “one device, one table” design philosophy, if you need to compute results separately for all devices, you would traditionally need to create a separate stream computing task for each subtable. This can be inconvenient to manage and inefficient to process. To address this, TDengine TSDB's stream computing supports trigger grouping. A group is the smallest execution unit in stream computing. Logically, you can think of each group as an independent stream computing task, with its own output table and its own event notifications.
In summary, the number of output tables (subtables or regular tables) produced by a stream computing task equals the number of groups in the trigger table. If no grouping is specified, only one output table (a regular table) is created.
A computation task is the calculation executed by the stream after an event is triggered. It can be any type of query statement, and can operate on the trigger table or on other databases and tables. When performing calculations, you may need to use contextual information from the trigger event. In SQL statements, these are represented as placeholders, which are replaced with constant values at execution time for each calculation. The available options include:
_tprev_ts: event time of previous trigger_tcurrent_ts: event time of current trigger_tnext_ts: event time of next trigger_twstart: start timestamp of current window_twend: end timestamp of current window_twduration: duration of current window_twrownum: number of rows in current window_tprev_localtime: system time of previous trigger_tnext_localtime: system time of next trigger_tgrpid: ID of trigger group_tlocaltime: system time of current trigger%%n: reference to the trigger grouping column, where n is the index of the grouping column%%tbname: reference to trigger table; can be used in queries as FROM %%tbname.%%trows: reference to the trigger dataset for each group in the trigger table (i.e., the set of rows that meet the current trigger condition)Control options are used to manage trigger and computation behavior. Multiple options can be specified, but the same option cannot be specified more than once. The available options include:
Event notifications are optional actions executed after a stream is triggered. Notifications can be sent to applications over the WebSocket protocol. Users specify the events to be notified and the target address for receiving messages. The notification content may include the computation results, or, when no result is produced, only the event-related information.
CREATE STREAM sm1 COUNT_WINDOW(1) FROM tb1
INTO tb3 AS
SELECT _twstart, avg(col1) FROM tb2
WHERE _c0 >= _twend - 5m AND _c0 <= _twend;
CREATE STREAM sm2 COUNT_WINDOW(10, 1, col1) FROM tb1
STREAM_OPTIONS(CALC_NOTIFY_ONLY | PRE_FILTER(col1 > 0))
NOTIFY("ws://localhost:8080/notify") ON (WINDOW_CLOSE)
AS
SELECT avg(col1) FROM %%trows;
CREATE STREAM `idmp`.`ana_temp` EVENT_WINDOW(start with `temp` > 80 end with `temp` <= 80 ) TRUE_FOR(10m) FROM `idmp`.`vt_envsens02_471544`
STREAM_OPTIONS( IGNORE_DISORDER)
INTO `idmp`.`ana_temp`
AS
SELECT _twstart+0s as output_timestamp, avg(`temp`) as `avgtemp` FROM idmp.`vt_engsens02_471544` where ts >= _twstart and ts <= _twend;
CREATE STREAM sm1 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname
WHERE _c0 >= _twstart AND _c0 <= _twend;
In the SQL above, FROM %%tbname WHERE _c0 >= _twstart AND _c0 <= _twend and FROM %%trows are not equivalent. The former means the computation uses data from the trigger group’s corresponding table within the window’s time range; those in-window rows may differ from what %%trows saw at trigger time. The latter means the computation uses only the window data captured at the moment of triggering.
CREATE STREAM sm2 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
STREAM_OPTIONS(MAX_DELAY(1m) | FILL_HISTORY_FIRST)
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname WHERE _c0 >= _twstart AND _c0 <= _twend;
CREATE STREAM avg_stream INTERVAL(1m) SLIDING(1m) FROM meters
NOTIFY ('ws://localhost:8080/notify', 'wss://192.168.1.1:8080/notify?key=foo') ON ('WINDOW_OPEN', 'WINDOW_CLOSE') NOTIFY_OPTIONS(NOTIFY_HISTORY | ON_FAILURE_PAUSE)
INTO avg_stb
AS
SELECT _twstart, _twend, AVG(current) FROM %%trows;
CREATE STREAM sm1 PERIOD(1h)
INTO tb2
AS
SELECT cast(_tlocaltime/1000000 AS TIMESTAMP), count(*) FROM tb1;
ws://localhost:8080/notify.CREATE STREAM sm1 PERIOD(1h)
NOTIFY("ws://localhost:8080/notify");
Stream processing in TDengine is architected with a separation of compute and storage, which requires that at least one snode be deployed in the system. Except for data reads, all stream processing functions run exclusively on snodes.
TDengine supports the use of WATERMARK to mitigate issues caused by out-of-order data, updates, and deletions. A WATERMARK is a user-defined duration based on event time that represents the system’s progress in stream processing, reflecting the user’s tolerance for out-of-order data. The current watermark is defined as latest processed event time – WATERMARK interval. Only data with event times earlier than the current watermark are eligible for trigger evaluation. Likewise, only windows or other trigger conditions whose time boundaries are earlier than the current watermark will be triggered.
For out-of-order, update, or delete scenarios that exceed the WATERMARK, recalculation is used to ensure the correctness of results. Recalculation means re-triggering and re-executing computations for the data range affected by out-of-order, updated, or deleted records. To make this approach effective, users must ensure that their computation statements and source tables are independent of processing time—that is, the same trigger should produce valid results even if executed multiple times.
Recalculation can be either automatic or manual. If automatic recalculation is not needed, it can be disabled via configuration options.