doc/user/content/sql/create-source/postgres.md
{{< source-versioning-disambiguation is_new=false other_ref="new reference page" include_blurb=true >}}
{{% create-source/intro %}}
Materialize supports PostgreSQL (11+) as a data source. To connect to a
PostgreSQL instance, you first need to create a connection
that specifies access and authentication parameters.
Once created, a connection is reusable across multiple CREATE SOURCE
statements.
{{% /create-source/intro %}}
{{< warning >}} Before creating a PostgreSQL source, you must set up logical replication in the upstream database. For step-by-step instructions, see the integration guide for your PostgreSQL service: AlloyDB, Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted. {{< /warning >}}
{{< include-md file="shared-content/aws-privatelink-cloud-only-note.md" >}}
{{% include-syntax file="examples/create_source_postgres_legacy" example="syntax" %}}
This source uses PostgreSQL's native replication protocol to continually ingest
changes resulting from INSERT, UPDATE and DELETE operations in the
upstream database — a process also known as change data capture.
For this reason, you must configure the upstream PostgreSQL database to support logical replication before creating a source in Materialize. For step-by-step instructions, see the integration guide for your PostgreSQL service: AlloyDB, Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted.
To avoid creating multiple replication slots in the upstream PostgreSQL database and minimize the required bandwidth, Materialize ingests the raw replication stream data for some specific set of tables in your publication.
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES;
When you define a source, Materialize will automatically:
Create a replication slot in the upstream PostgreSQL database (see PostgreSQL replication slots).
The name of the replication slot created by Materialize is prefixed with
materialize_ for easy identification, and can be looked up in
mz_internal.mz_postgres_sources.
SELECT id, replication_slot FROM mz_internal.mz_postgres_sources;
id | replication_slot
--------+----------------------------------------------
u8 | materialize_7f8a72d0bf2a4b6e9ebc4e61ba769b71
Create a subsource for each original table in the publication.
SHOW SOURCES;
name | type
----------------------+-----------
mz_source | postgres
mz_source_progress | progress
table_1 | subsource
table_2 | subsource
And perform an initial, snapshot-based sync of the tables in the publication before it starts ingesting change events.
Incrementally update any materialized or indexed views that depend on the
source as change events stream in, as a result of INSERT, UPDATE and
DELETE operations in the upstream PostgreSQL database.
Each source ingests the raw replication stream data for all tables in the specified publication using a single replication slot. This allows you to minimize the performance impact on the upstream database, as well as reuse the same source across multiple materializations.
{{< tip >}}
{{% include-from-yaml data="postgres_source_details" name="postgres-replication-slots-tip-list" %}}
{{</ tip >}}
CREATE SOURCE will attempt to create each upstream table in the same schema as
the source. This may lead to naming collisions if, for example, you are
replicating schema1.table_1 and schema2.table_1. Use the FOR TABLES
clause to provide aliases for each upstream table, in such cases, or to specify
an alternative destination schema in Materialize.
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR TABLES (schema1.table_1 AS s1_table_1, schema2_table_1 AS s2_table_1);
By default, PostgreSQL sources expose progress metadata as a subsource that you
can use to monitor source ingestion progress. The name of the progress
subsource can be specified when creating a source using the EXPOSE PROGRESS AS clause; otherwise, it will be named <src_name>_progress.
The following metadata is available for each source as a progress subsource:
| Field | Type | Meaning |
|---|---|---|
lsn | uint8 | The last Log Sequence Number (LSN) consumed from the upstream PostgreSQL replication stream. |
And can be queried using:
SELECT lsn
FROM <src_name>_progress;
The reported LSN should increase as Materialize consumes new WAL records from the upstream PostgreSQL database. For more details on monitoring source ingestion progress and debugging related issues, see Troubleshooting.
Materialize supports schema changes in the upstream database as follows:
{{% include-from-yaml data="postgres_source_details" name="postgres-compatible-schema-changes-legacy" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-incompatible-schema-changes-legacy" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-publication-membership" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-publication-membership-mitigation-legacy" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-supported-types" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-unsupported-types" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-truncation-restriction" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-inherited-tables" %}}
{{% include-from-yaml data="postgres_source_details" name="postgres-inherited-tables-action-legacy" %}}
{{< important >}} Before creating a PostgreSQL source, you must set up logical replication in the upstream database. For step-by-step instructions, see the integration guide for your PostgreSQL service: AlloyDB, Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted. {{< /important >}}
A connection describes how to connect and authenticate to an external system you want Materialize to read data from.
Once created, a connection is reusable across multiple CREATE SOURCE
statements. For more details on creating connections, check the
CREATE CONNECTION documentation page.
CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
USER 'postgres',
PASSWORD SECRET pgpass,
SSL MODE 'require',
DATABASE 'postgres'
);
If your PostgreSQL server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service (Materialize Cloud) or an SSH bastion host.
{{< tabs tabID="1" >}} {{< tab "AWS PrivateLink">}}
{{< include-md file="shared-content/aws-privatelink-cloud-only-note.md" >}}
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
USER 'postgres',
PASSWORD SECRET pgpass,
AWS PRIVATELINK privatelink_svc,
DATABASE 'postgres'
);
For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.
{{< /tab >}} {{< tab "SSH tunnel">}}
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST 'bastion-host',
PORT 22,
USER 'materialize',
);
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
SSH TUNNEL ssh_connection,
DATABASE 'postgres'
);
For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.
{{< /tab >}} {{< /tabs >}}
Create subsources for all tables included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES;
Create subsources for all tables from specific schemas included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR SCHEMAS (public, project);
Create subsources for specific tables included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR TABLES (table_1, table_2 AS alias_table_2);
If the publication contains tables that use data types
unsupported by Materialize, use the TEXT COLUMNS option to decode data as
text for the affected columns. This option expects the upstream names of the
replicated table and column (i.e. as defined in your PostgreSQL database).
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (
PUBLICATION 'mz_source',
TEXT COLUMNS (upstream_table_name.column_of_unsupported_type)
) FOR ALL TABLES;
{{% include-headless "/headless/schema-changes-in-progress" %}}
To handle upstream schema changes or errored subsources, use
the DROP SOURCE syntax to drop the affected
subsource, and then ALTER SOURCE...ADD SUBSOURCE to add
the subsource back to the source.
-- List all subsources in mz_source
SHOW SUBSOURCES ON mz_source;
-- Get rid of an outdated or errored subsource
DROP SOURCE table_1;
-- Start ingesting the table with the updated schema or fix
ALTER SOURCE mz_source ADD SUBSOURCE table_1;
When adding subsources to a PostgreSQL source, Materialize opens a temporary replication slot to snapshot the new subsources' current states. After completing the snapshot, the table will be kept up-to-date, like all other tables in the publication.
Dropping a subsource prevents Materialize from ingesting any data from it, in addition to dropping any state that Materialize previously had for the table.
CREATE SECRETCREATE CONNECTIONCREATE SOURCE