docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx
ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka.
<Note>Available on the Enterprise plan. Contact us for details.
</Note>See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:
<iframe width="100%" height="400" src="https://www.youtube.com/embed/RD_HZ7xE8G0" title="YouTube video" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen /> <Info>In this video, the SQL API is used to connect to Power BI. Currently, it's recommended to use the DAX API.
</Info>If you are using Confluent Cloud, you need to generate an API key and use the API key name as your username and the API key secret as your password.
You can generate an API key by installing confluent-cli and running the
following commands in the command line:
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>
Add the following to a .env file in your Cube project:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=password
| Environment Variable | Description | Possible Values | Required |
|---|---|---|---|
CUBEJS_DB_URL | The host URL for ksqlDB with port | A valid database host URL | ✅ |
CUBEJS_DB_USER | The username used to connect to the ksqlDB. API key for Confluent Cloud. | A valid database username | ✅ |
CUBEJS_DB_PASS | The password used to connect to the ksqlDB. API secret for Confluent Cloud. | A valid database password | ✅ |
CUBEJS_DB_KAFKA_HOST | Kafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated. | A valid Kafka broker URL | ❌ |
CUBEJS_DB_KAFKA_USER | Username for Kafka broker authentication (SASL PLAIN) | A valid Kafka username | ❌ |
CUBEJS_DB_KAFKA_PASS | Password for Kafka broker authentication (SASL PLAIN) | A valid Kafka password | ❌ |
CUBEJS_DB_KAFKA_USE_SSL | If true, enables SASL_SSL for the Kafka connection | true, false | ❌ |
CUBEJS_CONCURRENCY | The number of concurrent queries to the data source | A valid number | ❌ |
ksqlDB supports only streaming pre-aggregations.
By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.
In this default mode, Cube may create tables and streams in ksqlDB as part
of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT
statements for non-read-only pre-aggregations).
When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.
In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.
Kafka streams mode is useful when:
Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=true
Multiple Kafka brokers can be specified as a comma-separated list:
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092
When using Confluent Cloud,
the Kafka credentials are separate from the ksqlDB credentials. Generate
an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.
With Kafka streams mode enabled:
DESCRIBE.CREATE TABLE or CREATE STREAM statements to ksqlDB.ksqlDB is typically used as an additional data source alongside a primary
data warehouse. To use Kafka streams mode, configure ksqlDB as a named
data source using decorated environment variables
and point your cubes to it with the
data_source property.
First, declare the data sources and configure the ksqlDB connection with Kafka credentials:
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=true
Then, create cubes that reference your data. A common pattern is to combine a batch cube (reading historical data from your warehouse) with a streaming cube (reading real-time data from ksqlDB via Kafka) using a lambda pre-aggregation.
The batch cube queries the warehouse and builds daily partitions
incrementally. The streaming cube points at an existing ksqlDB stream
with data_source: ksql and uses a read-only streaming pre-aggregation
that consumes from the backing Kafka topic directly. The lambda
pre-aggregation in the batch cube merges both, serving historical data
from the warehouse rollup and real-time data from the streaming rollup:
cubes:
- name: order_events
data_source: default
sql: >
SELECT
order_id,
user_id,
status,
amount,
created_at
FROM ecommerce.order_events
WHERE {FILTER_PARAMS.order_events.created_at.filter(
(from, to) =>
`created_at >= ${from} AND created_at < ${to}`
)}
measures:
- name: count
type: count
- name: total_amount
sql: amount
type: sum
- name: failed_count
sql: "CASE WHEN status = 'failed' THEN 1 ELSE 0 END"
type: sum
dimensions:
- name: order_id
sql: order_id
type: string
primary_key: true
- name: user_id
sql: user_id
type: string
- name: status
sql: status
type: string
- name: created_at
sql: created_at
type: time
pre_aggregations:
- name: lambda
type: rollup_lambda
rollups:
- order_events.batch
- order_events_stream.stream
- name: batch
type: rollup
measures:
- CUBE.count
- CUBE.total_amount
- CUBE.failed_count
dimensions:
- CUBE.order_id
- CUBE.user_id
- CUBE.status
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
build_range_start:
sql: SELECT NOW() - INTERVAL '90 days'
build_range_end:
sql: SELECT NOW()
refresh_key:
every: 8 hour
update_window: 1 day
incremental: true
indexes:
- name: user_status
columns:
- CUBE.user_id
- CUBE.status
- name: order_events_stream
data_source: ksql
sql: "SELECT * FROM ORDER_EVENTS_STREAM"
measures:
- name: count
type: count
- name: total_amount
sql: AMOUNT
type: sum
- name: failed_count
sql: "CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END"
type: sum
dimensions:
- name: order_id
sql: ORDER_ID
type: string
primary_key: true
- name: user_id
sql: USER_ID
type: string
- name: status
sql: STATUS
type: string
- name: created_at
sql: CREATED_AT
type: time
pre_aggregations:
- name: stream
type: rollup
read_only: true
measures:
- CUBE.count
- CUBE.total_amount
- CUBE.failed_count
dimensions:
- CUBE.order_id
- CUBE.user_id
- CUBE.status
unique_key_columns:
- order_id
- user_id
- status
- created_at_second
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
build_range_start:
sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))"
build_range_end:
sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')"
refresh_key:
every: 1 minute
update_window: 1 hour
incremental: true
indexes:
- name: user_status
columns:
- CUBE.user_id
- CUBE.status
- orders__created_at_second
stream_offset: latest
output_column_types:
- member: CUBE.order_id
type: text
- member: CUBE.user_id
type: text
- member: CUBE.status
type: text
- member: CUBE.created_at.second
type: timestamp
- member: CUBE.count
type: int
- member: CUBE.total_amount
type: decimal
- member: CUBE.failed_count
type: int
cube("order_events", {
data_source: "default",
sql: `
SELECT
order_id,
user_id,
status,
amount,
created_at
FROM ecommerce.order_events
WHERE ${FILTER_PARAMS.order_events.created_at.filter(
(from, to) => `created_at >= ${from} AND created_at < ${to}`
)}
`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `amount`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN status = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `order_id`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `user_id`,
type: `string`,
},
status: {
sql: `status`,
type: `string`,
},
created_at: {
sql: `created_at`,
type: `time`,
},
},
pre_aggregations: {
lambda: {
type: `rollup_lambda`,
rollups: [
order_events.batch,
order_events_stream.stream,
],
},
batch: {
type: `rollup`,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT NOW() - INTERVAL '90 days'`,
},
build_range_end: {
sql: `SELECT NOW()`,
},
refresh_key: {
every: `8 hour`,
update_window: `1 day`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
},
},
},
},
});
cube("order_events_stream", {
data_source: "ksql",
sql: `SELECT * FROM ORDER_EVENTS_STREAM`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `AMOUNT`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `ORDER_ID`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `USER_ID`,
type: `string`,
},
status: {
sql: `STATUS`,
type: `string`,
},
created_at: {
sql: `CREATED_AT`,
type: `time`,
},
},
pre_aggregations: {
stream: {
type: `rollup`,
read_only: true,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
unique_key_columns: [
`order_id`,
`user_id`,
`status`,
`created_at_second`,
],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`,
},
build_range_end: {
sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`,
},
refresh_key: {
every: `1 minute`,
update_window: `1 hour`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status, `orders__created_at_second`],
},
},
stream_offset: `latest`,
output_column_types: [
{ member: CUBE.order_id, type: `text` },
{ member: CUBE.user_id, type: `text` },
{ member: CUBE.status, type: `text` },
{ member: CUBE.created_at.second, type: `timestamp` },
{ member: CUBE.count, type: `int` },
{ member: CUBE.total_amount, type: `decimal` },
{ member: CUBE.failed_count, type: `int` },
],
},
},
});
Key properties for the streaming pre-aggregation:
read_only: true — Cube will not create any objects in ksqlDB. The
data is consumed directly from the backing Kafka topic.stream_offset — controls where Cube Store starts consuming from in
the Kafka topic. Set to "latest" to only consume new messages
arriving after the pre-aggregation is created. Set to "earliest" to
replay the topic from the beginning. Defaults to "latest" if not
specified. On subsequent refreshes, Cube Store automatically resumes
from the last processed offset regardless of this setting.unique_key_columns — columns that uniquely identify a record, used
for deduplication (see below).output_column_types — declares the output column types for the Cube
Store table, required for Kafka streams mode (see
below).For the streaming pre-aggregation to work in read-only mode, the
generated SQL must not contain a GROUP BY clause — Cube Store's stream
post-processing engine does not support aggregation.
Cube automatically omits the GROUP BY clause when the dimensions
included in the pre-aggregation contain a primary key. In that case, the
generated query becomes a simple SELECT ... FROM ... without grouping,
and measures are passed through as raw expressions rather than
aggregated. This is what makes the pre-aggregation eligible for the
read-only streaming path.
You must include all primary key columns of the cube in the
streaming pre-aggregation's dimensions list. If any primary key
dimension is missing, the query may not be recognized as ungrouped
and will fail to use the streaming path.
The sql_table or sql value should reference an existing ksqlDB stream
or table. Cube discovers its schema automatically. With Kafka streams
mode enabled, the streaming pre-aggregation reads the backing Kafka topic
directly — no objects are created in ksqlDB.
In Kafka streams mode, Cube Store parses the select_statement
(generated from the cube's sql property) and matches the FROM table
name against the actual Kafka topic name. On managed platforms like
Confluent Cloud, the Kafka topic name often differs from the ksqlDB
stream or table name — for example, a ksqlDB stream called
ORDER_EVENTS_STREAM might be backed by a Kafka topic named
pksqlc-abc123ORDER_EVENTS_STREAM.
The cube's sql property must reference the ksqlDB stream or table
name (not the Kafka topic), because Cube uses ksqlDB DESCRIBE to
discover the schema and resolve the backing topic. However, Cube does
not currently rewrite the FROM clause in the generated
select_statement to use the resolved Kafka topic name. If the ksqlDB
object name differs from the Kafka topic name, Cube Store will fail
with:
<Warning>Topic table ORDER_EVENTS_STREAM is not found
This is a known limitation of Kafka streams mode. It does not occur when the ksqlDB object name and the Kafka topic name are the same, which is the default behavior when ksqlDB creates a stream or table with the default topic naming strategy.
</Warning>When unique_key_columns is set, Cube Store appends an internal
sequence column (__seq) to the table, populated from the Kafka
partition offset. The unique key columns together with __seq form the
sort key for all indexes on this table.
Entries in unique_key_columns are strings, not member references. Use
the dimension's own name (for example, user_id). To include the time
dimension as part of the unique key, use the form
<time_dimension_name>_<granularity> — for example, created_at_second
when granularity is second.
The naming convention for the time dimension inside unique_key_columns
is not the same as the column name used in
indexes. Indexes
expect the fully-qualified alias from the generated select_statement
(<cube_name>__<time_dimension_name>_<granularity>), while
unique_key_columns expects only <time_dimension_name>_<granularity>.
If the pre-aggregation defines indexes, every dimension referenced by
any index must also be listed in unique_key_columns. Cube Store uses
unique_key_columns (plus __seq) as the sort key for all indexes, so
an index column that is not part of the unique key cannot be sorted on
during compaction and the pre-aggregation build will fail.
Deduplication is not applied at ingestion time — all incoming records are
appended as they arrive. Instead, Cube Store deduplicates during
reads and compaction: rows are sorted by the unique key columns
and then by __seq, and only the last row per unique key (the one
with the highest sequence number) is kept. This means that if the same
key appears multiple times in the stream, the most recent version is
always the one returned by queries.
For Kafka messages, unique key column values can come from either the
message payload (the JSON value) or the message key. If a column
listed in unique_key_columns is missing from the payload, Cube Store
falls back to the Kafka message key: for a single unique key column, the
raw key value is used; for composite keys, the key is expected to be a
JSON object with matching field names.
In Kafka streams mode, Cube Store creates its internal pre-aggregation
table based on column type information. By default, column types are
inferred from the source ksqlDB stream using DESCRIBE. However, the
pre-aggregation's select_statement (generated from the rollup
definition) renames and transforms columns — for example, a source
column CREATED_AT becomes order_events_stream__created_at_second in
the output.
When this renaming happens, the raw source column types no longer match the output column names, causing errors like:
Key column
order_events_stream__idnot found among column definitions
To fix this, define output_column_types on the streaming
pre-aggregation. This tells Cube the exact output column types to use
for the Cube Store table, and separately passes the source schema so
Cube Store can deserialize the raw Kafka messages correctly.
pre_aggregations:
- name: stream
type: rollup
read_only: true
measures:
- CUBE.count
- CUBE.total_amount
- CUBE.failed_count
dimensions:
- CUBE.order_id
- CUBE.user_id
- CUBE.status
unique_key_columns:
- order_id
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
build_range_start:
sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))"
build_range_end:
sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')"
refresh_key:
every: 1 minute
update_window: 1 hour
incremental: true
stream_offset: latest
output_column_types:
- member: CUBE.order_id
type: text
- member: CUBE.user_id
type: text
- member: CUBE.status
type: text
- member: CUBE.created_at.second
type: timestamp
- member: CUBE.count
type: int
- member: CUBE.total_amount
type: decimal
- member: CUBE.failed_count
type: int
pre_aggregations: {
stream: {
type: `rollup`,
read_only: true,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
unique_key_columns: [`order_id`],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`,
},
build_range_end: {
sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`,
},
refresh_key: {
every: `1 minute`,
update_window: `1 hour`,
incremental: true,
},
stream_offset: `latest`,
output_column_types: [
{ member: CUBE.order_id, type: `text` },
{ member: CUBE.user_id, type: `text` },
{ member: CUBE.status, type: `text` },
{ member: CUBE.created_at.second, type: `timestamp` },
{ member: CUBE.count, type: `int` },
{ member: CUBE.total_amount, type: `decimal` },
{ member: CUBE.failed_count, type: `int` },
],
},
},
Each entry in output_column_types has two properties:
member — a reference to a dimension or measure included in the
pre-aggregation. This must be a CUBE member reference (for example,
CUBE.user_id), not a string. For the time dimension, reference it
with the rollup granularity attached — for example,
CUBE.created_at.second when granularity is second.type — the Cube Store column type. Common values: text, int,
bigint, decimal, float, boolean, timestamp.Include an entry for every dimension and measure in the pre-aggregation.
The time dimension must also be listed, with type: timestamp and the
granularity suffix on the member reference as shown above.
When output_column_types is defined, Cube uses the aliased column
names (matching the select_statement) for the Cube Store table
definition and passes the raw source schema separately via
source_table, so Cube Store knows how to deserialize incoming Kafka
messages. Without it, column names come from the raw ksqlDB DESCRIBE
output and will not match the aliased names in the select_statement
or unique_key_columns.
output_column_types is required for Kafka streams mode when the
pre-aggregation uses unique_key_columns. Without it, the unique key
column names will not match the table column definitions, causing the
pre-aggregation build to fail.
Cube Store expects Kafka messages to have a JSON object as their value payload, with field names matching the column names defined in the cube. For example, given the streaming cube above, each Kafka message value should look like:
{
"ORDER_ID": "ord_12345",
"USER_ID": "usr_789",
"STATUS": "completed",
"AMOUNT": 49.99,
"CREATED_AT": "2025-01-15T10:30:00.000"
}
Field names are case-sensitive and must match the column names used in
the sql property of each dimension and measure definition. Missing
fields default to null.
The message key is optional. When present and the value starts with {,
it is parsed as a JSON object and used as a fallback source for unique
key column values (see above).
For dimensions with type: time, Cube Store accepts timestamp values in
two formats:
2025-01-15T10:30:00.000Z2025-01-15T10:30:00Z2025-01-15 10:30:00.000 UTC2025-01-15T10:30:002025-01-15 10:30:002025-01-151736939400000 represents
2025-01-15T10:30:00.000Z.If your Kafka topic produces timestamps as strings in a non-standard
format, you can use PARSE_TIMESTAMP in the cube's sql property to
convert them. In that case, define the source column as type: string
in a source_table and use the select_statement to transform it:
sql: `SELECT PARSE_TIMESTAMP(TIMESTAMP_STR,
'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') AS created_at,
ORDER_ID, USER_ID, STATUS, AMOUNT
FROM ORDER_EVENTS_STREAM`,
Time dimension truncation (controlled by the granularity property of
the pre-aggregation) is handled automatically. Cube generates the
appropriate PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))
expression chain to truncate timestamps to the configured granularity
(e.g., day, hour, minute). Cube Store evaluates these expressions
natively on each micro-batch during ingestion. Standard SQL functions
like date_trunc are also available in the select_statement.
bigint timestampsWhen a source column stores a timestamp as a bigint (a common pattern
in Kafka topics — for example, a created_at field with values like
1778800167128), the value must be converted to a timestamp before it
can be used as a time dimension.
The conversion depends on the unit of the bigint value:
Date.now() in JavaScript, Java's
System.currentTimeMillis()) — multiply by 1000 before casting,
because CAST(... AS TIMESTAMP(6)) interprets numeric input as
microseconds. Without the multiplication, a millisecond value like
1778800167128 is read as microseconds and produces a timestamp in
1970 instead of the intended date.TIMESTAMP(6).1000000 before casting.Apply the conversion in the time dimension's sql property:
dimensions: {
created_at: {
sql: `CAST(${CUBE}.created_at * 1000 AS TIMESTAMP(6))`,
type: `time`
}
}
The cast runs first; any granularity truncation generated by Cube
(PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))) then operates on
the resulting TIMESTAMP(6) value.
When the streaming cube defines a sql property with a SELECT
statement (rather than sql_table), Cube Store applies the projection
and any WHERE filters from that statement directly on each micro-batch
of incoming Kafka messages. This filtering happens inside Cube Store
using its query engine — it does not require ksqlDB to process the
filter. Only rows that pass the filter are ingested into the
pre-aggregation table.
This allows you to define a streaming cube that only ingests a subset of the data from the underlying Kafka topic without creating any server-side filter objects in ksqlDB.
The SELECT statement must follow a strict shape. Cube Store only
accepts plans that resolve to Projection > Filter > TableScan (where
the filter is optional). Any other query plan shape is rejected.
Supported:
SELECT with column references (e.g., SELECT col1, col2 FROM topic)SELECT * wildcardSELECT col1 AS my_alias)WHERE clause with comparison operators (=, !=, <, >, <=,
>=)WHERE (AND, OR, NOT)IS NULL and IS NOT NULLIN lists (col IN (1, 2, 3))BETWEEN expressionsCASE ... WHEN ... THEN ... ELSE ... END expressionsCAST(expr AS type) type conversionsEXTRACT(field FROM expr) for date/time partsSUBSTRING(expr FROM start FOR length)COALESCE, CONCAT, arithmetic)CONVERT_TZ for timezone conversion (internally rewritten for
compatibility)PARSE_TIMESTAMP and FORMAT_TIMESTAMP for timestamp parsing and
formatting using ksql-style format strings (e.g.,
yyyy-MM-dd'T'HH:mm:ss.SSS)date_trunc for timestamp truncationNot supported:
JOIN clauses — only a single FROM table is allowedSELECT or WHEREGROUP BY, HAVING, or aggregate functions (SUM, COUNT, AVG,
etc.)ORDER BY (rows are consumed in stream order)LIMIT and OFFSETUNION, INTERSECT, EXCEPTOVER, PARTITION BY)FROM or multiple WHERE clausesWITH ... AS)All column expressions in the SELECT list that are not simple column
references must have explicit aliases. Unique key columns may reference
the source column through a scalar function (e.g.,
CAST(id AS VARCHAR) AS id), but not through arbitrary expressions.