docs/en/14-reference/03-taos-sql/41-stream.md
Compared with traditional stream processing, TDengine TSDB’s stream processing extends both functionality and boundaries. Traditionally, stream processing is defined as a real-time computing paradigm focused on low latency, continuity, and event-time-driven processing of unbounded data streams. TDengine TSDB’s stream processing adopts a trigger–compute decoupling strategy, still operating on continuous unbounded data streams, but with the following enhancements:
TDengine TSDB’s stream processing engine also offers additional usability benefits. For varying requirements on result latency, it allows users to balance between result timeliness and resource load. For different needs in out-of-order write scenarios, it enables users to flexibly choose appropriate handling methods and strategies.
Note: The new stream processing feature is supported starting from v3.3.7.0.
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
}
trigger_type: {
PERIOD(period_time[, offset_time])
| SLIDING(sliding_val[, offset_time])
| INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time])
| SESSION(ts_col, session_val)
| STATE_WINDOW(col[, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)]
| EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)]
| EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...] [END WITH end_condition]) [TRUE_FOR(true_for_expr)]
| COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
}
true_for_expr: {
duration_time
| COUNT count_val
| duration_time AND COUNT count_val
| duration_time OR COUNT count_val
}
stream_option: {WATERMARK(duration_time) | EXPIRED_TIME(exp_time) | IGNORE_DISORDER | DELETE_RECALC | DELETE_OUTPUT_TABLE | FILL_HISTORY[(start_time)] | FILL_HISTORY_FIRST[(start_time)] | CALC_NOTIFY_ONLY | LOW_LATENCY_CALC | PRE_FILTER(expr) | FORCE_OUTPUT | MAX_DELAY(delay_time) | EVENT_TYPE(event_types) | IGNORE_NODATA_TRIGGER}
notification_definition:
NOTIFY(url [, ...]) [ON (event_types)] [WHERE condition] [NOTIFY_OPTIONS(notify_option[|notify_option])]
notify_option: [NOTIFY_HISTORY | ON_FAILURE_PAUSE]
event_types:
event_type [|event_type]
event_type: {WINDOW_OPEN | WINDOW_CLOSE}
tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
Event triggers are the driving mechanism for stream processing. The source of an event trigger can vary—it may come from data being written to a table, from the analytical results of computations on a table, or even from no table at all. When the stream processing engine detects that the user-defined trigger conditions are met, it initiates the computation. The number of times the condition is met corresponds exactly to the number of times computation is triggered. The trigger object and the computation object are independent of each other. Users can flexibly define and use various types of windows to generate trigger events, with support for triggering on window open, window close, or both. Group-based triggering is supported, as well as pre-filtering of trigger data so that only data meeting the criteria will be considered for triggering.
The trigger type is specified using trigger_type and includes: scheduled trigger, sliding trigger, time window trigger, session window trigger, state window trigger, event window trigger, and count window trigger. When using state windows, event windows, or count windows with a supertable, they must be used together with partition by tbname.
PERIOD(period_time[, offset_time])
A scheduled trigger is driven by a fixed interval based on the system time, essentially functioning as a scheduled task. It does not belong to the category of window triggers. Parameter definitions are as follows:
Usage Notes:
Applicable scenarios: Situations requiring scheduled computation driven continuously by system time, such as generating daily statistics every hour, or sending scheduled statistical reports once a day.
SLIDING(sliding_val[, offset_time])
A sliding trigger drives execution based on a fixed interval of event time for data written to the trigger table. It is not considered a window trigger. A trigger table must be specified. The trigger times and time offset rules are the same as for scheduled triggers, with the only difference being that the system time is replaced by event time.
Parameter definitions are as follows:
Usage Notes:
Applicable scenarios: Situations where calculations need to be driven continuously and periodically based on event time, such as generating daily statistical data every hour or sending scheduled reports each day.
INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val)
A time window trigger refers to triggering based on incoming data written to the trigger table, using event time and a fixed window size that slides over time. The INTERVAL window must be specified. This is a type of window trigger, and a trigger table must be specified.
The starting point for a time window trigger is the beginning of the window. By default, windows are divided starting from Unix time 0 (1970-01-01 00:00:00 UTC). You can change the starting point of the window division by specifying a window time offset. Parameter definitions are as follows:
Usage Notes:
Applicable Scenarios: Suitable for event-time-based scheduled window calculations, such as generating hourly statistics for that hour, or calculating data within the last 5-minute window every hour.
SESSION(ts_col, session_val)
A session window trigger divides the incoming data written to the trigger table into windows based on session boundaries, and triggers when a window starts and/or closes. Parameter definitions are as follows:
Usage Notes:
Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by session windows.
STATE_WINDOW(col[, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)]
A state window trigger divides the written data of the trigger table into windows based on the values in a state column. A trigger occurs when a window is opened and/or closed. Parameter definitions are as follows:
col: The name of the state column.
extend (optional): Specifies the extension strategy for the start and end of a window. The optional values are 0 (default), 1, and 2, representing no extension, backward extension, and forward extension respectively.
zeroth_state (optional): Specifies the "zero state". Windows with this state in the state column will not be calculated or output, and the input must be an integer, boolean, or string constant. When setting the value of zeroth_extend, the extend value is a mandatory input and must not be left blank or omitted.
true_for_expr (optional): Specifies the filtering condition for windows. Only windows that meet the condition will generate a trigger. Supports the following four modes:
TRUE_FOR(duration_time): Filters based on duration only. The window duration must be greater than or equal to duration_time.TRUE_FOR(COUNT n): Filters based on row count only. The window row count must be greater than or equal to n.TRUE_FOR(duration_time AND COUNT n): Both duration and row count conditions must be satisfied.TRUE_FOR(duration_time OR COUNT n): Either duration or row count condition must be satisfied.Where duration_time is a positive time value with supported units: 1n (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), 1w (weeks). Examples: TRUE_FOR(10m), TRUE_FOR(COUNT 100), TRUE_FOR(10m AND COUNT 100), TRUE_FOR(10m OR COUNT 100).
Usage Notes:
Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by state windows.
EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)]
An event window trigger partitions the incoming data of the trigger table into windows based on defined event start and end conditions, and triggers when the window opens and/or closes. Parameter definitions are as follows:
start_condition: Definition of the event start condition.
end_condition: Definition of the event end condition.
true_for_expr (optional): Specifies the filtering condition for windows. Only windows that meet the condition will generate a trigger. Supports the following four modes:
TRUE_FOR(duration_time): Filters based on duration only. The window duration must be greater than or equal to duration_time.TRUE_FOR(COUNT n): Filters based on row count only. The window row count must be greater than or equal to n.TRUE_FOR(duration_time AND COUNT n): Both duration and row count conditions must be satisfied.TRUE_FOR(duration_time OR COUNT n): Either duration or row count condition must be satisfied.Where duration_time is a positive time value with supported units: 1n (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), 1w (weeks). Examples: TRUE_FOR(10m), TRUE_FOR(COUNT 100), TRUE_FOR(10m AND COUNT 100), TRUE_FOR(10m OR COUNT 100).
Usage Notes:
Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by event windows.
EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...] [END WITH end_condition]) [TRUE_FOR(true_for_expr)]
An event window trigger partitions the incoming data of the trigger table into windows based on event windows. It now supports specifying multiple start conditions and can further subdivide and manage sub-event windows within the original event window based on changes in the effective trigger condition, while introducing the concept of a parent event window to aggregate related sub-event windows. Parameter definitions are as follows:
start_condition_1, start_condition_2 [, ...]: Defines multiple event start conditions. The event window opens when any one of these conditions is satisfied. The system evaluates these conditions in order from first to last, and the first satisfied condition becomes the "effective trigger condition". When all start_conditions are not satisfied, both the parent window and the last sub-window close.
end_condition: Definition of the event end condition. When this condition is satisfied, both the current parent window and the last sub-window close. This parameter is now optional.
true_for_expr (optional): Specifies the filtering condition for windows. Only windows that meet the condition will generate a trigger. Supports the following four modes:
TRUE_FOR(duration_time): Filters based on duration only. The window duration must be greater than or equal to duration_time.TRUE_FOR(COUNT n): Filters based on row count only. The window row count must be greater than or equal to n.TRUE_FOR(duration_time AND COUNT n): Both duration and row count conditions must be satisfied.TRUE_FOR(duration_time OR COUNT n): Either duration or row count condition must be satisfied.Where duration_time is a positive time value with supported units: 1n (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), 1w (weeks). Examples: TRUE_FOR(10m), TRUE_FOR(COUNT 100), TRUE_FOR(10m AND COUNT 100), TRUE_FOR(10m OR COUNT 100).
Usage Notes:
Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by event windows, especially in IoT and industrial data management fields where fine-grained monitoring and analysis of events based on multiple dynamically changing conditions is required. For example, in equipment fault alarms, multiple alarm level conditions (such as "load above 90" and "load above 60") can be defined, and when alarm levels change, the escalation or de-escalation of alarm states can be clearly tracked.
COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
A count window trigger partitions the written data from the trigger table based on a counting window, and triggers when the window starts and/or closes. It supports column-based triggering, where the trigger occurs only when the specified columns receive data writes. Parameter definitions are as follows:
Usage Notes:
Applicable Scenarios:
After a trigger is activated, different actions can be performed as needed, such as sending an event notification, executing a computation task, or performing both simultaneously.
In general, one stream computing task corresponds to a single computation — for example, triggering a computation based on one subtable and storing the result in one output table. Following TDengine’s “one device, one table” design philosophy, if you need to compute results separately for all devices, you would traditionally need to create a separate stream computing task for each subtable. This can be inconvenient to manage and inefficient to process. To address this, TDengine TSDB's stream computing supports trigger grouping. A group is the smallest execution unit in stream computing. Logically, you can think of each group as an independent stream computing task, with its own output table and its own event notifications. If no group is specified, or if no trigger table is specified (allowed in the case of scheduled triggers), the entire stream computing task will produce only a single computation — effectively meaning there is only one group, which corresponds to a single output table and a single notification. Since each group operates as an independent stream computing task, their computation progress, output frequency, and other behaviors can differ from one another.
In summary, the number of output tables (subtables or regular tables) produced by a stream computing task equals the number of groups in the trigger table. If no grouping is specified, only one output table (a regular table) is created. The currently supported combinations of trigger types and grouping are as follows:
| Trigger Mechanism | Supported Grouping |
|---|---|
| PERIOD, SLIDING, INTERVAL, and SESSION | Subtable, tag, and none |
| Other | Subtable |
A trigger table can be a regular table, supertable, subtable, or virtual table. System tables, views, and queries are not supported. Except for periodic triggers, which can omit specifying a trigger table, all other trigger types must specify one.
[FROM [db_name.]table_name]
Specifies the columns used for trigger grouping. Multiple columns are supported, but currently only grouping by subtables and tags is supported.
[PARTITION BY col1 [, ...]]
By default, the results of a stream are stored in an output table. Each output table contains only the results that have been triggered and computed up to the current time. You can define the structure of the output table, and if grouping is used, you can also specify the tag values for each subtable.
[INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]
tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
Details are as follows:
INTO [db_name.]table_name: Optional. Specifies the output table name as table_name and the database name as db_name.
[OUTPUT_SUBTABLE(tbname_expr)]: Optional. Specifies the name of the calculation output table (subtable) for each trigger group. This cannot be specified if there is no trigger grouping. If not specified, a unique output table (subtable) name will be automatically generated for each group. tbname_expr can be any output string expression, and may include trigger group partition columns (from [PARTITION BY col1[, ...]]). The output length must not exceed the maximum table name length; if it does, it will be truncated. If you do not want different groups to output to the same subtable, you must ensure each group's output table name is unique.[(column_name1, column_name2 [COMPOSITE KEY][, ...])]: Optional. Specifies the column names for each column in the output table. If not specified, each column name will be the same as the corresponding column name in the calculation result. You can use [COMPOSITE KEY] to indicate that the second column is a primary key column, forming a composite primary key together with the first column.[TAGS (tag_definition [, ...])]: Optional. Specifies the list of tag column definitions and values for the output supertable. This can only be specified if trigger grouping is present. If not specified, the tag column definitions and values are derived from all grouping columns, and in this case, grouping columns cannot have duplicate names. When grouping by subtable, the default generated tag column name is tag_tbname, with the type VARCHAR(270). The tag_definition parameters are as follows:
tag_name: Name of the tag column.type_name: Data type of the tag column.string_value: Description of the tag column.expr: Tag value calculation expression, which can use any trigger table grouping columns (from [PARTITION BY col1[, ...]]).[AS subquery]
A computation task is the calculation executed by the stream after an event is triggered. It can be any type of query statement, and can operate on the trigger table or on other databases and tables. Computation tasks are highly flexible and should be carefully designed before creating the stream. Notes:
When performing calculations, you may need to use contextual information from the trigger event. In SQL statements, these are represented as placeholders, which are replaced with constant values at execution time for each calculation. Placeholders include:
| Trigger Type | Placeholder | Description |
|---|---|---|
| Scheduled Trigger | _tprev_localtime | System time of previous trigger (nanosecond precision) |
| Scheduled Trigger | _tnext_localtime | System time of next trigger (nanosecond precision) |
| Sliding Trigger | _tprev_ts | Event time of previous trigger (same precision as record) |
| Sliding Trigger | _tcurrent_ts | Event time of current trigger (same precision as record) |
| Sliding Trigger | _tnext_ts | Event time of next trigger (same precision as record) |
| Window Trigger | _twstart | Start timestamp of current window |
| Window Trigger | _twend | End timestamp of currently open window. Used only with WINDOW_CLOSE trigger. |
| Window Trigger | _twduration | Duration of currently open window. Used only with WINDOW_CLOSE trigger. |
| Window Trigger | _twrownum | Number of rows in currently open window. Used only with WINDOW_CLOSE trigger. |
| All | _tgrpid | ID of trigger group (data type BIGINT) |
| All | _tlocaltime | System time of current trigger (nanosecond precision) |
| All | %%n | Reference to trigger group column |
n is the column number in [PARTITION BY col1[, ...]], starting with 1 | ||
| All | %%tbname | Reference to trigger table |
| Only used with the trigger group contains tbname. | ||
Can be used in queries as FROM %%tbname | ||
| All | %%trows | Reference to the trigger dataset of each group in the trigger table (the dataset that satisfies the current trigger). |
| For scheduled triggers, this refers to the data written to the trigger table between the last and current trigger. | ||
| Can only be used as a query table name (FROM %%trows). | ||
| Applicable only for WINDOW_CLOSE triggers. | ||
| Recommended for use in small data volume scenarios. |
Usage Restrictions:
[STREAM_OPTIONS(stream_option [|...])]
stream_option: {WATERMARK(duration_time) | EXPIRED_TIME(exp_time) | IGNORE_DISORDER | DELETE_RECALC | DELETE_OUTPUT_TABLE | FILL_HISTORY[(start_time)] | FILL_HISTORY_FIRST[(start_time)] | CALC_NOTIFY_ONLY | LOW_LATENCY_CALC | PRE_FILTER(expr) | FORCE_OUTPUT | MAX_DELAY(delay_time) | EVENT_TYPE(event_types) | IGNORE_NODATA_TRIGGER}
Control options are used to manage trigger and computation behavior. Multiple options can be specified, but the same option cannot be specified more than once. The available options include:
col1 > 0 ensures that only rows where col1 is positive will be evaluated. Default: If not specified, no pre-filtering is applied.Event notifications are optional actions executed after a stream is triggered. Notifications can be sent to applications over the WebSocket protocol. Users define notifications through a notification_definition, which specifies the events to be notified and the target address for receiving messages. The notification content may include the computation results, or, when no result is produced, only the event-related information.
[notification_definition]
notification_definition:
NOTIFY(url [, ...]) [ON (event_types)] [WHERE condition] [NOTIFY_OPTIONS(notify_option[|notify_option])]
event_types:
event_type [|event_type]
event_type: {WINDOW_OPEN | WINDOW_CLOSE | ON_TIME}
Details:
ws://localhost:8080, ws://localhost:8080/notify, ws://localhost:8080/notify?key=foo.When a specified event is triggered, taosd sends a POST request to the configured URL. The message body is in JSON format. A single request may contain events from multiple streams, and the event types may vary. The event information included depends on the window type:
An example structure of a notification message is shown below:
{
"messageId": "unique-message-id-12345",
"timestamp": 1733284887203,
"streams": [
{
"streamName": "avg_current_stream",
"events": [
{
"tableName": "t_a667a16127d3b5a18988e32f3e76cd30",
"eventType": "WINDOW_OPEN",
"eventTime": 1733284887097,
"triggerId": "window-id-67890",
"triggerType": "Interval",
"groupId": "2650968222368530754",
"windowStart": 1733284800000
},
{
"tableName": "t_a667a16127d3b5a18988e32f3e76cd30",
"eventType": "WINDOW_CLOSE",
"eventTime": 1733284887197,
"triggerId": "window-id-67890",
"triggerType": "Interval",
"groupId": "2650968222368530754",
"windowStart": 1733284800000,
"windowEnd": 1733284860000,
"result": {
"_wstart": 1733284800000,
"avg(current)": 1.3
}
}
]
},
{
"streamName": "max_voltage_stream",
"events": [
{
"tableName": "t_96f62b752f36e9b16dc969fe45363748",
"eventType": "WINDOW_OPEN",
"eventTime": 1733284887231,
"triggerId": "window-id-13579",
"triggerType": "Event",
"groupId": "7533998559487590581",
"windowStart": 1733284800000,
"triggerCondition": {
"conditionIndex": 0,
"fieldValue": {
"c1": 10,
"c2": 15
}
},
},
{
"tableName": "t_96f62b752f36e9b16dc969fe45363748",
"eventType": "WINDOW_CLOSE",
"eventTime": 1733284887231,
"triggerId": "window-id-13579",
"triggerType": "Event",
"groupId": "7533998559487590581",
"windowStart": 1733284800000,
"windowEnd": 1733284810000,
"triggerCondition": {
"conditionIndex": 1,
"fieldValue": {
"c1": 20,
"c2": 3
}
},
"result": {
"_wstart": 1733284800000,
"max(voltage)": 220
}
}
]
}
]
}
The following sections describe each field in the notification message.
These fields are shared by all event objects:
These fields apply when triggerType is Period.
These fields apply when triggerType is Sliding.
These fields apply when triggerType is Interval.
These fields apply only when triggerType is State.
These fields apply only when triggerType is Session.
These fields apply only when triggerType is Event.
These fields apply only when triggerType is Count.
During stream processing, out-of-order data, updates, or deletions may cause an already generated window to be removed or require its results to be recalculated. In such cases, a WINDOW_INVALIDATION notification is sent to the target address to indicate which windows have been deleted.
These fields apply only when eventType is WINDOW_INVALIDATION.
This operation deletes only the stream processing task. Data written by the stream processing task will not be deleted.
DROP STREAM [IF EXISTS] [db_name.]stream_name [, [db_name.]stream_name] ...
Displays the stream processing tasks in the current database or in a specified database.
SHOW [db_name.]STREAMS;
For more detailed information, query the system table information_schema.ins_streams:
SELECT * from information_schema.`ins_streams`;
When a stream is running, it is executed as multiple tasks. Detailed task information can be obtained from the system table information_schema.ins_stream_tasks:
SELECT * from information_schema.`ins_stream_tasks`;
START STREAM [IF EXISTS] [IGNORE UNTREATED] [db_name.]stream_name;
Notes:
STOP STREAM [IF EXISTS] [db_name.]stream_name;
Notes:
Stream processing in TDengine is architected with a separation of compute and storage, which requires that at least one snode be deployed in the system. Except for data reads, all stream processing functions run exclusively on snodes.
Before creating a stream processing task, an snode must be deployed. The syntax is as follows:
CREATE SNODE ON DNODE dnode_id;
You can view information about snodes with the following command:
SHOW SNODES;
For more detailed information, use:
SELECT * FROM information_schema.`ins_snodes`;
When you delete an snode, both the snode and its replica must be online to synchronize stream state information. If either the snode or its replica is offline, the deletion will fail.
DROP SNODE ON DNODE dnode_id;
Permission control for stream processing is tied only to database-level permissions. Since each stream may be associated with multiple databases, the requirements are as follows:
| Associated Database | Count | Auth Action | Required Permission |
|---|---|---|---|
| Database where the stream is defined | 1 | Create, delete, stop, start, manual recomputation | Write |
| Database of the trigger table | 1 | Create | Read |
| Database of the output table | 1 | Create | Write |
| Databases of the computation sources | 1 or more | Create | Read |
Most TDengine TSDB window types are associated with primary key columns. For example, event windows rely on data ordered by primary key to determine when to open and close a window. When using window-based triggers, it is important that trigger table data be written in an orderly fashion, as this ensures the highest efficiency in stream processing. If out-of-order data is written, it may affect the correctness of results for windows that have already been triggered. Similarly, updates and deletions can also compromise result correctness.
TDengine supports the use of WATERMARK to mitigate issues caused by out-of-order data, updates, and deletions. A WATERMARK is a user-defined duration based on event time that represents the system’s progress in stream processing, reflecting the user’s tolerance for out-of-order data. The current watermark is defined as latest processed event time – WATERMARK interval. Only data with event times earlier than the current watermark are eligible for trigger evaluation. Likewise, only windows or other trigger conditions whose time boundaries are earlier than the current watermark will be triggered. Note: WATERMARK does not apply to PERIOD (scheduled) triggers. In PERIOD mode, no recalculation is performed.
For out-of-order, update, or delete scenarios that exceed the WATERMARK, recalculation is used to ensure the correctness of results. Recalculation means re-triggering and re-executing computations for the data range affected by out-of-order, updated, or deleted records. The results already written to the output table are not deleted; instead, new results are written again. To make this approach effective, users must ensure that their computation statements and source tables are independent of processing time—that is, the same trigger should produce valid results even if executed multiple times.
Recalculation can be either automatic or manual. If automatic recalculation is not needed, it can be disabled via configuration options.
Manual recalculation must be explicitly initiated by the user and can be started with an SQL command when needed.
RECALCULATE STREAM [db_name.]stream_name FROM start_time [TO end_time];
Notes:
Out-of-order data refers to records written to the trigger table in a non-sequential order. While the computation itself does not depend on whether the source table is ordered, users must ensure—based on business requirements—that the source table’s data is fully written before a trigger occurs. The impact of out-of-order data and how it is handled vary depending on the trigger type.
| Trigger Type | Impact and Handling |
|---|---|
| Periodic trigger | |
| Sliding trigger | |
| Count window trigger | Ignored; no processing performed. |
| Other window triggers | Default: Handled through recalculation. |
| Optional: Ignored; no processing performed. |
Data updates refer to multiple writes of records with the same timestamp, where other column values may or may not change. Update operations affect only the trigger table and the triggering behavior—they do not directly affect the computation process itself. The impact of data updates and how they are handled vary depending on the trigger type.
| Trigger Type | Impact and Handling |
|---|---|
| Periodic trigger | |
| Sliding trigger | |
| Count window trigger | Ignored; no processing performed. |
| Other window triggers | Treated as out-of-order data and handled through recalculation. |
Data deletions affect only the trigger table and the triggering behavior—they do not directly impact the computation process itself. The impact of data deletions and how they are handled vary depending on the trigger type.
| Trigger Type | Impact and Handling |
|---|---|
| Periodic trigger | |
| Sliding trigger | |
| Count window trigger | Ignored; no processing performed. |
| Other window triggers | Default: Ignored; no processing performed. |
| Optional: Treated as out-of-order data and handled through recalculation. |
The expired_time setting defines a data expiration interval. For each group generated by a stream trigger, the system determines whether new data is expired by comparing the event time of the latest data against the expiration threshold. The threshold is calculated as:latest event time – expired_time. All data earlier than this threshold is treated as expired.
After a stream is created, users may perform operations on the databases and tables associated with the stream. The effects of these operations on the stream and how the stream handles them are summarized as follows:
| Operation | Operation Impact and Stream Handling |
|---|---|
| User creates a new child table under a trigger supertable (non-virtual) and writes data | The new child table is automatically included in the current stream processing, either joining an existing group or creating a new one. |
| User creates a new child table under a virtual trigger supertable and writes data | Ignored; no additional handling. |
| User deletes a child table of the trigger supertable | Default: Ignored. |
| Optional: Certain trigger types can be configured to automatically recalculate, or to delete the corresponding result table (only applies to streams grouped by child table). | |
| User deletes the trigger table | Ignored; no additional handling. |
| User adds a column to the trigger table | Ignored; no additional handling. |
| User deletes a column from the trigger table | Ignored; no additional handling. |
| User modifies the tag value of a child table under the trigger supertable | If the tag column is used by the stream as a grouping key, the operation is not allowed and results in an error. |
| Otherwise, ignored. | |
| User modifies the schema of the trigger table columns | Ignored; no additional handling. (An error will be raised when a schema mismatch is detected at read time.) |
| User modifies or deletes a source table | Ignored; no additional handling. |
| User modifies or deletes an output table | Ignored; no additional handling. (If a schema mismatch is detected at write time, an error is raised. If the table does not exist, it will be recreated.)) |
| User splits a vnode | Not allowed if the database containing the vnode is a source database or trigger table database. |
| Not allowed if virtual tables are used for triggers or computations. | |
| The user may force execution after confirming no impact with SPLIT VGROUP N FORCE. | |
| User deletes a database | Not allowed if the deleted database is a source database of a stream, or a trigger table database that is not the same as the stream’s own database. |
| Not allowed if the stream involves triggers or computations on virtual tables from non-target databases. | |
The user may force execution after confirming no impact with DROP DATABASE name FORCE. |
Apart from the operations explicitly restricted or specially handled in the table above, all other operations—as well as those marked as ignored; no additional handling—are unrestricted. However, if such operations may affect stream computation, it is the user’s responsibility to decide how to proceed: either ignore the impact or perform a manual recalculation to restore correctness.
Stream processing–related configuration parameters are listed below. For full details, see taosd.
The following rules and limitations apply to stream processing:
Temporary Restrictions:
Compared with version 3.3.6.0, stream processing has been completely redesigned. Before upgrading from the old version, the following steps must be performed, after which streams should be recreated under the new stream processing version:
Note: If the above steps are not performed, taosd will fail to start.
The redesigned stream processing engine offers greater flexibility and removes many previous limitations. While availability has been improved, it also introduces higher requirements for proper use.
Before creating a stream, users should carefully review the following key checkpoints. Once clarified, streams can be designed and used accordingly:
In the stream status display (query the table information_schema.ins_streams), several detailed status indicators are listed. Examples include whether real-time computations are keeping up with progress, how many recalculations (and what ratio) have occurred, and any error messages. Users and administrators should monitor this information to determine whether stream processing is functioning normally, and use it as a basis for analysis and optimization.
CREATE stream sm1 count_window(1) FROM tb1
INTO tb3 AS
SELECT _twstart, avg(col1) FROM tb2
WHERE _c0 >= _twend - 5m AND _c0 <= _twend;
CREATE stream sm2 count_window(10, 1, col1) FROM tb1
STREAM_OPTIONS(CALC_ONTIFY_ONLY | PRE_FILTER(col1 > 0))
NOTIFY("ws://localhost:8080/notify") ON (WINDOW_CLOSE)
AS
SELECT avg(col1) FROM %%trows;
CREATE STREAM `idmp`.`ana_temp` EVENT_WINDOW(start with `temp` > 80 end with `temp` <= 80 ) TRUE_FOR(10m) FROM `idmp`.`vt_envsens02_471544`
STREAM_OPTIONS( IGNORE_DISORDER)
INTO `idmp`.`ana_temp`
AS
SELECT _twstart+0s as output_timestamp, avg(`temp`) as `avgtemp` FROM idmp.`vt_engsens02_471544` where ts >= _twstart and ts <= _twend;
CREATE stream sm1 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname
WHERE _c0 >= _twstart AND _c0 <= _twend;
In the SQL above, FROM %%tbname WHERE _c0 >= _twstart AND _c0 <= _twend and FROM %%trows are not equivalent. The former means the computation uses data from the trigger group’s corresponding table within the window’s time range; those in-window rows may differ from what %%trows saw at trigger time. The latter means the computation uses only the window data captured at the moment of triggering.
CREATE stream sm2 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
STREAM_OPTIONS(MAX_DELAY(1m) | FILL_HISTORY_FIRST)
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname WHERE _c0 >= _twstart AND _c0 <= _twend;
CREATE STREAM avg_stream INTERVAL(1m) SLIDING(1m) FROM meters
NOTIFY ('ws://localhost:8080/notify', 'wss://192.168.1.1:8080/notify?key=foo') ON ('WINDOW_OPEN', 'WINDOW_CLOSE') NOTIFY_OPTIONS(NOTIFY_HISTORY | ON_FAILURE_PAUSE)
INTO avg_stb
AS
SELECT _twstart, _twend, AVG(current) FROM %%trows;
CREATE stream sm1 PERIOD(1h)
INTO tb2
AS
SELECT cast(_tlocaltime/1000000 AS TIMESTAMP), count(*) FROM tb1;
ws://localhost:8080/notify.CREATE stream sm1 PERIOD(1h)
NOTIFY("ws://localhost:8080/notify");
CREATE stream stream_consumer_energy
PERIOD(1d)
FROM meters PARTITION BY tbname, groupid, location
INTO meters_1d (ts, sum_power)
TAGS (groupid INT AS groupid , location VARCHAR(24) AS location)
AS
SELECT cast(_tlocaltime/1000000 AS timestamp) ,sum(current*voltage) AS sum_power
FROM meters
WHERE ts >= cast(_tprev_localtime/1000000 AS timestamp) AND ts <= cast(_tlocaltime/1000000 AS timestamp);