docs/content.zh/docs/sql/materialized-table/statements.md
Flink SQL 目前支持以下物化表操作:
CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)]
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[WITH (key1=val1, key2=val2, ...)]
[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }]
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
PRIMARY KEY 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。
PARTITIONED BY 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 filesystem connector ,每个分区将创建一个目录。
示例:
-- 创建一个物化表并将分区字段设置为 `ds`。
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
ds
FROM
...
<span class="label label-danger">注意</span>
WITH Options 可以定义创建物化表所需的属性,包括[连接器参数]({{< ref "docs/connectors/table/" >}})和分区字段的[时间格式参数]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter)。
-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd'
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
WITH (
'format' = 'json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
...
如上例所示,我们为 ds 分区列指定了 date-formatter 选项。每次调度时,调度时间将转换为相应的 ds 分区值。例如,在 2024-01-01 00:00:00 的调度时间下,只有分区 ds = '2024-01-01' 会被刷新。
<span class="label label-danger">注意</span>
FRESHNESS 用于指定物化表的数据新鲜度。
FRESHNESS is optional. When omitted, the system uses the default freshness based on the refresh mode: materialized-table.default-freshness.continuous (default: 3 minutes) for CONTINUOUS mode, or materialized-table.default-freshness.full (default: 1 hour) for FULL mode.
数据新鲜度与刷新模式关系
数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。When not specified, it uses the default value from configuration based on the refresh mode. 它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/sql/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
FRESHNESS 参数详解
FRESHNESS 参数的取值范围为 INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }。'<num>' 必须为正整数,并且在全量模式下,'<num>' 应该是相应时间间隔单位的公约数。
示例:
(假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)
-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业
FRESHNESS = INTERVAL '1' SECOND
-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业
FRESHNESS = INTERVAL '1' MINUTE
-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流
FRESHNESS = INTERVAL '1' HOUR
-- 对应的刷新管道是一个调度周期为 1 天的调度工作流
FRESHNESS = INTERVAL '1' DAY
Default FRESHNESS Example:
(Assuming materialized-table.default-freshness.continuous is 3 minutes, materialized-table.default-freshness.full is 1 hour, and materialized-table.refresh-mode.freshness-threshold is 30 minutes)
-- FRESHNESS is omitted, uses the configured default of 3 minutes for CONTINUOUS mode
-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 3 minutes
CREATE MATERIALIZED TABLE my_materialized_table
AS SELECT * FROM source_table;
-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the configured default of 1 hour
-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour
CREATE MATERIALIZED TABLE my_materialized_table_full
REFRESH_MODE = FULL
AS SELECT * FROM source_table;
不合法的 FRESHNESS 示例:
-- 间隔为负数
FRESHNESS = INTERVAL '-1' SECOND
-- 间隔为0
FRESHNESS = INTERVAL '0' SECOND
-- 间隔为月或者年
FRESHNESS = INTERVAL '1' MONTH
FRESHNESS = INTERVAL '1' YEAR
-- 全量模式下,间隔不是对应时间间隔单位的公约数
FRESHNESS = INTERVAL '60' SECOND
FRESHNESS = INTERVAL '5' HOUR
<span class="label label-danger">注意</span>
materialized-table.default-freshness.continuous (default: 3 minutes) for CONTINUOUS mode, or materialized-table.default-freshness.full (default: 1 hour) for FULL mode.checkpoint 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 checkpoint 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。cron 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 cron 表达式语义的一致性。具体支持以下新鲜度间隔:
REFRESH_MODE 用于显式指定物化表的刷新模式。指定的刷新模式比框架自动推导的模式具有更高的优先级,以满足特定场景的需求。
示例:
(假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)
-- 创建的物化表的刷新模式为持续模式,作业的 checkpoint 间隔为 1 小时。
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '1' HOUR
REFRESH_MODE = CONTINUOUS
AS SELECT
...
-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' MINUTE
REFRESH_MODE = FULL
AS SELECT
...
该子句用于定义填充物化表数据的查询。上游表可以是物化表、表或视图。select 语句支持所有 Flink SQL [查询]({{< ref "docs/sql/reference/queries/overview" >}})。
示例:
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS SELECT * FROM kafka_catalog.db1.kafka_table;
The OR ALTER clause provides create-or-update semantics:
ALTER MATERIALIZED TABLE AS)This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists.
Behavior when materialized table exists:
The operation updates the materialized table similarly to ALTER MATERIALIZED TABLE AS:
Full mode:
Continuous mode:
See ALTER MATERIALIZED TABLE AS for more details.
假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟。
创建一个数据新鲜度为 10 秒的物化表,推导出的刷新模式为持续模式:
CREATE MATERIALIZED TABLE my_materialized_table_continuous
PARTITIONED BY (ds)
WITH (
'format' = 'debezium-json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
k.ds,
k.user_id,
COUNT(*) AS event_count,
SUM(k.amount) AS total_amount,
MAX(u.age) AS max_age
FROM
kafka_catalog.db1.kafka_table k
JOIN
user_catalog.db1.user_table u
ON
k.user_id = u.user_id
WHERE
k.event_type = 'purchase'
GROUP BY
k.ds, k.user_id
创建一个数据新鲜度为 1 小时的物化表,推导出的刷新模式为全量模式:
CREATE MATERIALIZED TABLE my_materialized_table_full
PARTITIONED BY (ds)
WITH (
'format' = 'json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT
p.ds,
p.product_id,
p.product_name,
AVG(s.sale_price) AS avg_sale_price,
SUM(s.quantity) AS total_quantity
FROM
paimon_catalog.db1.product_table p
LEFT JOIN
paimon_catalog.db1.sales_table s
ON
p.product_id = s.product_id
WHERE
p.category = 'electronics'
GROUP BY
p.ds, p.product_id, p.product_name
And same materialized table with explicitly specified columns
CREATE MATERIALIZED TABLE my_materialized_table_full (
ds, product_id, product_name, avg_sale_price, total_quantity)
...
The order of the columns doesn't need to be the same as in the query, Flink will do reordering if required i.e. this will be also valid
CREATE MATERIALIZED TABLE my_materialized_table_full (
product_id, product_name, ds, avg_sale_price, total_quantity)
...
Another way of doing this is putting name and data type
CREATE MATERIALIZED TABLE my_materialized_table_full (
ds STRING, product_id STRING, product_name STRING, avg_sale_price DOUBLE, total_quantity BIGINT)
...
It might happen that types of columns are not the same, in that case implicit casts will be applied. If for some of the combinations implicit cast is not supported then there will be validation error thrown. Also, it is worth to note that reordering can also be done here.
Create or alter a materialized table executed twice:
-- First execution: creates the materialized table
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
-- Second execution: alters the query definition (adds avg_amount column)
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- Add a new nullable column at the end
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
<span class="label label-danger">Note</span>
nullable columns to the end of the original materialized table's schema.ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] |
REFRESH [PARTITION partition_spec] |
AS <select_statement>
ALTER MATERIALIZED TABLE 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新,以及修改物化表的查询定义。
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND
SUSPEND 用于暂停物化表的后台刷新管道。
示例:
-- 暂停前指定 SAVEPOINT 路径
SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path';
-- 暂停指定的物化表
ALTER MATERIALIZED TABLE my_materialized_table SUSPEND;
<span class="label label-danger">注意</span>
STOP WITH SAVEPOINT 暂停作业,你需要使用[参数]({{< ref "docs/deployment/config" >}}#execution-checkpointing-savepoint-dir)设置 SAVEPOINT 保存路径。ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)]
RESUME 用于恢复物化表的刷新管道。在恢复时,可以通过 WITH 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。
示例:
-- 恢复指定的物化表
ALTER MATERIALIZED TABLE my_materialized_table RESUME;
-- 恢复指定的物化表并指定 sink 的并行度
ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH [PARTITION partition_spec]
REFRESH 用于主动触发物化表的刷新。
示例:
-- 刷新全表数据
ALTER MATERIALIZED TABLE my_materialized_table REFRESH;
-- 刷新指定分区数据
ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28');
<span class="label label-danger">注意</span>
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement>
AS <select_statement> 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 schema 更新表的 schema,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。
具体修改流程取决于物化表的刷新模式:
全量模式:
schema 和查询定义。持续模式:
schema 和查询定义。示例:
-- 原始物化表定义
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
-- 修改现有物化表的查询
ALTER MATERIALIZED TABLE my_materialized_table
AS SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- 在末尾追加新的可为空列
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
<span class="label label-danger">注意</span>
可空列。DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name
删除物化表时,首先删除后台刷新管道,然后从 Catalog 中移除该物化表的元数据。
示例:
-- 删除指定的物化表
DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;