doc/user/content/sql/create-sink/iceberg.md
{{< public-preview />}}
Use CREATE SINK ... INTO ICEBERG CATALOG... to create Iceberg sinks. Iceberg sinks write data from Materialize into an Iceberg table hosted on
AWS S3 Tables. As data changes in Materialize, your Iceberg tables are
automatically kept up to date.
To create an Iceberg sink, you need:
{{% include-syntax file="examples/create_sink_iceberg" example="syntax" %}}
Iceberg sinks continuously stream changes from Materialize to an Iceberg table. Specifically, Materialize writes data as Parquet files to the object storage backing your Iceberg catalog.
At each COMMIT INTERVAL:
When the snapshot is committed, the data is available to downstream query engines. See Commit interval tradeoffs.
If the specified Iceberg table does not exist, Materialize creates the table. The new Iceberg table:
Materialize creates unpartitioned tables. {{< include-from-yaml data="examples/create_sink_iceberg" name="restrictions-limitations-partitioned-tables" >}}
See also: Restrictions and limitations.
{{< include-from-yaml data="examples/create_sink_iceberg" name="exactly-once-delivery" >}}
The COMMIT INTERVAL setting involves tradeoffs between latency and efficiency:
Shorter intervals (e.g., < 60s) | Longer intervals (e.g., 5m) |
|---|---|
| Lower latency - data visible sooner | Higher latency - data takes longer to appear |
| More small files - can degrade query performance | Fewer, larger files - better query performance |
| Higher catalog overhead | Lower catalog overhead |
| Higher S3 write costs (more PUT requests) | Lower S3 write costs |
Recommendations:
60s to 5m5m to 15m{{< note >}}
Outside of development environments, commit intervals should be at least 60s.
Short commit intervals increase catalog overhead and produce many small files.
Small files will result in degraded query performance. It also increases load on
the Iceberg metadata, which can result in a degraded catalog and non-responsive
queries.
{{< /note >}}
The Iceberg sink uses upsert semantics based on the KEY. The columns you
specify as the KEY must uniquely identify rows. Materialize validates that the
key is unique; if it cannot prove uniqueness, you'll receive an error.
If you have outside knowledge that the key is unique, you can bypass validation
using NOT ENFORCED. However, if the key is not actually unique, downstream
consumers may see incorrect results.
{{% include-headless "/headless/iceberg-sinks/type-mapping" %}}
{{% include-headless "/headless/iceberg-sinks/limitations-list" %}}
Iceberg sinks use a hybrid delete strategy:
KEY column values.This means short-lived rows use efficient position deletes, while updates to older data use equality deletes.
{{< tip >}} Consider running Iceberg compaction periodically to merge delete files and improve query performance. {{< /tip >}}
{{% include-headless "/headless/sql-command-privileges/create-sink" %}}
{{% include-headless "/headless/iceberg-sinks/troubleshooting" %}}
To create an Iceberg sink, you need an AWS connection and an Iceberg catalog connection.
{{% include-example file="examples/create_connection" example="example-iceberg-catalog-connection" %}}
{{% include-example file="examples/create_sink_iceberg" example="example-create-iceberg-sink" %}}
The required KEY clause uniquely identifies rows; in this example, it uses a
composite key of user_id and event_timestamp. Materialize validates that
this key is unique in the source data.
If Materialize cannot prove your key is unique but you have outside knowledge
that it is, you can bypass validation by including NOT ENFORCED option:
CREATE SINK deduped_sink
IN CLUSTER my_cluster
FROM my_source
INTO ICEBERG CATALOG CONNECTION iceberg_catalog_connection (
NAMESPACE = 'raw',
TABLE = 'events'
)
USING AWS CONNECTION aws_connection
KEY (event_id) NOT ENFORCED
MODE UPSERT
WITH (COMMIT INTERVAL = '1m');
{{< warning >}} If the key is not actually unique, downstream consumers may see incorrect results. {{< /warning >}}