Back to Materialize

CREATE SINK: Kafka/Redpanda

doc/user/content/sql/create-sink/kafka.md

12331.0 KB
Original Source

{{< note >}} The CREATE SINK syntax, supported formats, and features are the same for Kafka and Redpanda broker. For simplicity, this page uses "Kafka" to refer to both Kafka and Redpanda. {{< /note >}}

CREATE SINK connects Materialize to an external system you want to write data to, and provides details about how to encode that data.

To use a Kafka broker (and optionally a schema registry) as a sink, make sure that a connection that specifies access and authentication parameters to that broker already exists; otherwise, you first need to create a connection. Once created, a connection is reusable across multiple CREATE SINK and CREATE SOURCE statements.

Sink source typeDescription
SourceSimply pass all data received from the source to the sink without modifying it.
TableStream all changes to the specified table out to the sink.
Materialized viewStream all changes to the view to the sink. This lets you use Materialize to process a stream, and then stream the processed values. Note that this feature only works with materialized views, and does not work with non-materialized views.

Syntax

{{< tabs level=3 >}}

{{< tab "Format Avro" >}}

{{% include-syntax file="examples/create_sink_kafka" example="syntax-avro" %}}

{{< /tab >}}

{{< tab "Format JSON" >}}

{{% include-syntax file="examples/create_sink_kafka" example="syntax-json" %}}

{{< /tab >}}

{{< tab "Format TEXT/BYTES" >}}

{{% include-syntax file="examples/create_sink_kafka" example="syntax-text-bytes" %}}

{{< /tab >}}

{{< tab "KEY FORMAT VALUE FORMAT" >}}

By default, the message key is encoded using the same format as the message value. However, you can set the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT.

{{% include-syntax file="examples/create_sink_kafka" example="syntax-key-value-format" %}}

{{< /tab >}}

{{< /tabs >}}

Headers

{{< private-preview />}}

Materialize always adds a header with key materialize-timestamp to each message emitted by the sink. The value of this header indicates the logical time at which the event described by the message occurred.

The HEADERS option allows specifying the name of a column containing additional headers to add to each message emitted by the sink. When the option is unspecified, no additional headers are added. When specified, the named column must be of type map[text => text] or map[text => bytea].

Header keys starting with materialize- are reserved for Materialize's internal use. Materialize will ignore any headers in the map whose key starts with materialize-.

Known limitations:

  • Materialize does not permit adding multiple headers with the same key.
  • Materialize cannot omit the headers column from the message value.
  • Materialize only supports using the HEADERS option with the upsert envelope.

Formats

The FORMAT option controls the encoding of the message key and value that Materialize writes to Kafka.

To use a different format for keys and values, use KEY FORMAT .. VALUE FORMAT .. to choose independent formats for each.

Avro

<p style="font-size:14px"><b>Syntax:</b> <code>FORMAT AVRO</code></p>

When using the Avro format, the value of each Kafka message is an Avro record containing a field for each column of the sink's upstream relation. The names and ordering of the fields in the record match the names and ordering of the columns in the relation.

If the KEY option is specified, the key of each Kafka message is an Avro record containing a field for each key column, in the same order and with the same names.

If a column name is not a valid Avro name, Materialize adjusts the name according to the following rules:

  • Replace all non-alphanumeric characters with underscores.
  • If the name begins with a number, add an underscore at the start of the name.
  • If the adjusted name is not unique, add the smallest number possible to the end of the name to make it unique.

For example, consider a table with two columns named col-a and col@a. Materialize will use the names col_a and col_a1, respectively, in the generated Avro schema.

When using a Confluent Schema Registry:

  • Materialize will automatically publish Avro schemas for the key, if present, and the value to the registry.

  • You can specify the fullnames for the Avro schemas Materialize generates using the AVRO KEY FULLNAME and AVRO VALUE FULLNAME syntax.

  • You can automatically have nullable fields in the Avro schemas default to null by using the NULL DEFAULTS option.

  • You can add doc fields to the Avro schemas.

SQL types are converted to Avro types according to the following conversion table:

SQL typeAvro type
bigint"long"
boolean"boolean"
bytea"bytes"
date{"type": "int", "logicalType": "date"}
double precision"double"
integer"int"
interval{"type": "fixed", "size": 16, "name": "com.materialize.sink.interval"}
jsonb{"type": "string", "connect.name": "io.debezium.data.Json"}
map{"type": "map", "values": ...}
list{"type": "array", "items": ...}
numeric(p,s){"type": "bytes", "logicalType": "decimal", "precision": p, "scale": s}
oid{"type": "fixed", "size": 4, "name": "com.materialize.sink.uint4"}
real"float"
record{"type": "record", "name": ..., "fields": ...}
smallint"int"
text"string"
time{"type": "long", "logicalType": "time-micros"}
uint2{"type": "fixed", "size": 2, "name": "com.materialize.sink.uint2"}
uint4{"type": "fixed", "size": 4, "name": "com.materialize.sink.uint4"}
uint8{"type": "fixed", "size": 8, "name": "com.materialize.sink.uint8"}
timestamp (p)If precision p is less than or equal to 3:
{"type": "long", "logicalType: "timestamp-millis"}
Otherwise:
{"type": "long", "logicalType: "timestamp-micros"}
timestamptz (p)Same as timestamp (p).
Arrays{"type": "array", "items": ...}

If a SQL column is nullable, and its type converts to Avro type t according to the above table, the Avro type generated for that column will be ["null", t], since nullable fields are represented as unions in Avro.

In the case of a sink on a materialized view, Materialize may not be able to infer the non-nullability of columns in all cases, and will conservatively assume the columns are nullable, thus producing a union type as described above. If this is not desired, the materialized view may be created using non-null assertions.

Avro schema documentation

Materialize allows control over the doc attribute for record fields and types in the generated Avro schemas for the sink.

For the container record type (named row for the key schema and envelope for the value schema, unless overridden by the AVRO ... FULLNAME options), Materialize searches for documentation in the following locations, in order:

  1. For the key schema, a KEY DOC ON TYPE option naming the sink's upstream relation. For the value schema, a VALUE DOC ON TYPE option naming the sink's upstream relation.
  2. A comment on the sink's upstream relation.

For record types within the container record type, Materialize searches for documentation in the following locations, in order:

  1. For the key schema, a KEY DOC ON TYPE option naming the SQL type corresponding to the record type. For the value schema, a VALUE DOC ON TYPE option naming the SQL type corresponding to the record type.
  2. A DOC ON TYPE option naming the SQL type corresponding to the record type.
  3. A comment on the SQL type corresponding to the record type.

Similarly, for each field of each record type in the Avro schema, Materialize documentation in the following locations, in order:

  1. For the key schema, a KEY DOC ON COLUMN option naming the SQL column corresponding to the field. For the value schema, a VALUE DOC ON COLUMN option naming the column corresponding to the field.
  2. A DOC ON COLUMN option naming the SQL column corresponding to the field.
  3. A comment on the SQL column corresponding to the field.

For each field or type, Materialize uses the documentation from the first location that exists. If no documentation is found for a given field or type, the doc attribute is omitted for that field or type.

JSON

<p style="font-size:14px"><b>Syntax:</b> <code>FORMAT JSON</code></p>

When using the JSON format, the value of each Kafka message is a JSON object containing a field for each column of the sink's upstream relation. The names and ordering of the fields in the record match the names and ordering of the columns in the relation.

If the KEY option is specified, the key of each Kafka message is a JSON object containing a field for each key column, in the same order and with the same names.

SQL values are converted to JSON values according to the following conversion table:

SQL typeConversion
[array][arrays]Values are converted to JSON arrays.
bigintValues are converted to JSON numbers.
booleanValues are converted to true or false.
integerValues are converted to JSON numbers.
listValues are converted to JSON arrays.
numericValues are converted to a JSON string containing the decimal representation of the number.
recordRecords are converted to JSON objects. The names and ordering of the fields in the object match the names and ordering of the fields in the record.
smallintvalues are converted to JSON numbers.
timestamp
timestamptzValues are converted to JSON strings containing the fractional number of milliseconds since the Unix epoch. The fractional component has microsecond precision (i.e., three digits of precision). Example: "1720032185.312"
uint2Values are converted to JSON numbers.
uint4Values are converted to JSON numbers.
uint8Values are converted to JSON numbers.
OtherValues are cast to text and then converted to JSON strings.

Text/Bytes

The TEXT and BYTES format options only support single-column encoding and cannot be used for keys or values with multiple columns.

Additionally, the BYTES format only works with scalar data types.

Envelopes

The sink's envelope determines how changes to the sink's upstream relation are mapped to Kafka messages.

There are two fundamental types of change events:

  • An insertion event is the addition of a new row to the upstream relation.
  • A deletion event is the removal of an existing row from the upstream relation.

When a KEY is specified, an insertion event and deletion event that occur at the same time are paired together into a single update event that contains both the old and new value for the given key.

Upsert

<p style="font-size:14px"><b>Syntax:</b> <code>ENVELOPE UPSERT</code></p>

The upsert envelope:

  • Requires that you specify a unique key for the sink's upstream relation using the KEY option. See upsert key selection for details.
  • For an insertion event, emits the row without additional decoration.
  • For an update event, emits the new row without additional decoration. The old row is not emitted.
  • For a deletion event, emits a message with a null value (i.e., a tombstone).

Consider using the upsert envelope if:

  • You need to follow standard Kafka conventions for upsert semantics.
  • You want to enable key-based compaction on the sink's Kafka topic while retaining the most recent value for each key.

Debezium

<p style="font-size:14px"><b>Syntax:</b> <code>ENVELOPE DEBEZIUM</code></p>

The Debezium envelope wraps each event in an object containing a before and after field to indicate whether the event was an insertion, deletion, or update event:

json
// Insertion event.
{"before": null, "after": {"field1": "val1", ...}}

// Deletion event.
{"before": {"field1": "val1", ...}, "after": null}

// Update event.
{"before": {"field1": "oldval1", ...}, "after": {"field1": "newval1", ...}}

Note that the sink will only produce update events if a KEY is specified.

Consider using the Debezium envelope if:

  • You have downstream consumers that want update events to contain both the old and new value of the row.
  • There is no natural KEY for the sink.

Features

Automatic topic creation

If the specified Kafka topic does not exist, Materialize will attempt to create it using the broker's default number of partitions, default replication factor, default compaction policy, and default retention policy, unless any specific overrides are provided as part of the connection options.

If the connection's progress topic does not exist, Materialize will attempt to create it with a single partition, the broker's default replication factor, compaction enabled, and both size- and time-based retention disabled. The replication factor can be overridden using the PROGRESS TOPIC REPLICATION FACTOR option when creating a connection CREATE CONNECTION.

To customize topic-level configuration, including compaction settings and other values, use the TOPIC CONFIG option in the connection options to set any relevant kafka topic configs.

If you manually create the topic or progress topic in Kafka before running CREATE SINK, observe the following guidance:

TopicConfigurationGuidance
Data topicPartition countYour choice, based on your performance and ordering requirements.
Data topicReplication factorYour choice, based on your durability requirements.
Data topicCompactionYour choice, based on your downstream applications' requirements. If using the Upsert envelope, enabling compaction is typically the right choice.
Data topicRetentionYour choice, based on your downstream applications' requirements.
Progress topicPartition countMust be set to 1. Using multiple partitions can cause Materialize to violate its exactly-once guarantees.
Progress topicReplication factorYour choice, based on your durability requirements.
Progress topicCompactionWe recommend enabling compaction to avoid accumulating unbounded state. Disabling compaction may cause performance issues, but will not cause correctness issues.
Progress topicRetentionMust be disabled. Enabling retention can cause Materialize to violate its exactly-once guarantees.
Progress topicTiered storageWe recommend disabling tiered storage to allow for more aggressive data compaction. Fully compacted data requires minimal storage, typically only tens of bytes per sink, making it cost-effective to maintain directly on local disk.
{{< warning >}}
{{% kafka-sink-drop %}}
{{</ warning >}}

Exactly-once processing

By default, Kafka sinks provide exactly-once processing guarantees, which ensures that messages are not duplicated or dropped in failure scenarios.

To achieve this, Materialize stores some internal metadata in an additional progress topic. This topic is shared among all sinks that use a particular Kafka connection. The name of the progress topic can be specified when creating a connection; otherwise, a default name of _materialize-progress-{REGION ID}-{CONNECTION ID} is used. In either case, Materialize will attempt to create the topic if it does not exist. The contents of this topic are not user-specified.

End-to-end exactly-once processing

Exactly-once semantics are an end-to-end property of a system, but Materialize only controls the initial produce step. To ensure end-to-end exactly-once message delivery, you should ensure that:

  • The broker is configured with replication factor greater than 3, with unclean leader election disabled (unclean.leader.election.enable=false).
  • All downstream consumers are configured to only read committed data (isolation.level=read_committed).
  • The consumers' processing is idempotent, and offsets are only committed when processing is complete.

For more details, see the Kafka documentation.

Partitioning

By default, Materialize assigns a partition to each message using the following strategy:

  1. Encode the message's key in the specified format.
  2. If the format uses a Confluent Schema Registry, strip out the schema ID from the encoded bytes.
  3. Hash the remaining encoded bytes using SeaHash.
  4. Divide the hash value by the topic's partition count and assign the remainder as the message's partition.

If a message has no key, all messages are sent to partition 0.

To configure a custom partitioning strategy, you can use the PARTITION BY option. This option allows you to specify a SQL expression that computes a hash for each message, which determines what partition to assign to the message:

sql
-- General syntax.
CREATE SINK ... INTO KAFKA CONNECTION <name> (PARTITION BY = <expression>) ...;

-- Example.
CREATE SINK ... INTO KAFKA CONNECTION <name> (
    PARTITION BY = kafka_murmur2(name || address)
) ...;

The expression:

  • Must have a type that can be assignment cast to uint8.
  • Can refer to any column in the sink's underlying relation when using the upsert envelope.
  • Can refer to any column in the sink's key when using the Debezium envelope.

Materialize uses the computed hash value to assign a partition to each message as follows:

  1. If the hash is NULL or computing the hash produces an error, assign partition 0.
  2. Otherwise, divide the hash value by the topic's partition count and assign the remainder as the message's partition (i.e., partition_id = hash % partition_count).

Materialize provides several hash functions which are commonly used in Kafka partition assignment:

  • crc32
  • kafka_murmur2
  • seahash

For a full example of using the PARTITION BY option, see Custom partioning.

Required privileges

To execute the CREATE SINK command, you need:

{{% include-headless "/headless/sql-command-privileges/create-sink" %}}

See also Required Kafka ACLs.

Required Kafka ACLs

The access control lists (ACLs) on the Kafka cluster must allow Materialize to perform the following operations on the following resources:

Operation typeResource typeResource name
Read, WriteTopicConsult mz_kafka_connections.sink_progress_topic for the sink's connection
WriteTopicThe specified TOPIC option
WriteTransactional IDAll transactional IDs beginning with the specified TRANSACTIONAL ID PREFIX option
ReadGroupAll group IDs beginning with the specified PROGRESS GROUP ID PREFIX option

When using automatic topic creation, Materialize additionally requires access to the following operations:

Operation typeResource typeResource name
DescribeConfigsClustern/a
CreateTopicThe specified TOPIC option

Kafka transaction markers

{{< include-md file="shared-content/kafka-transaction-markers.md" >}}

Troubleshooting

Upsert key selection

The KEY that you specify for an upsert envelope sink must be a unique key of the sink's upstream relation.

Materialize will attempt to validate the uniqueness of the specified key. If validation fails, you'll receive an error message like one of the following:

ERROR:  upsert key could not be validated as unique
DETAIL: Materialize could not prove that the specified upsert envelope key
("col1") is a unique key of the upstream relation. There are no known
valid unique keys for the upstream relation.

ERROR:  upsert key could not be validated as unique
DETAIL: Materialize could not prove that the specified upsert envelope key
("col1") is a unique key of the upstream relation. The following keys
are known to be unique for the upstream relation:
  ("col2")
  ("col3", "col4")

The first error message indicates that Materialize could not prove the existence of any unique keys for the sink's upstream relation. The second error message indicates that Materialize could prove that col2 and (col3, col4) were unique keys of the sink's upstream relation, but could not provide the uniqueness of the specified upsert key of col1.

There are three ways to resolve this error:

  • Change the sink to use one of the keys that Materialize determined to be unique, if such a key exists and has the appropriate semantics for your use case.

  • Create a materialized view that deduplicates the input relation by the desired upsert key:

    mzsql
    -- For each row with the same key `k`, the `ORDER BY` clause ensures we
    -- keep the row with the largest value of `v`.
    CREATE MATERIALIZED VIEW deduped AS
    SELECT DISTINCT ON (k) v
    FROM original_input
    ORDER BY k, v DESC;
    
    -- Materialize can now prove that `k` is a unique key of `deduped`.
    CREATE SINK s
    FROM deduped
    INTO KAFKA CONNECTION kafka_connection (TOPIC 't')
    KEY (k)
    FORMAT JSON ENVELOPE UPSERT;
    

    {{< note >}} Maintaining the deduped materialized view requires memory proportional to the number of records in original_input. Be sure to assign deduped to a cluster with adequate resources to handle your data volume. {{< /note >}}

  • Use the NOT ENFORCED clause to disable Materialize's validation of the key's uniqueness:

    mzsql
    CREATE SINK s
    FROM original_input
    INTO KAFKA CONNECTION kafka_connection (TOPIC 't')
    -- We have outside knowledge that `k` is a unique key of `original_input`, but
    -- Materialize cannot prove this, so we disable its key uniqueness check.
    KEY (k) NOT ENFORCED
    FORMAT JSON ENVELOPE UPSERT;
    

    You should only disable this verification if you have outside knowledge of the properties of your data that guarantees the uniqueness of the key you have specified.

    {{< warning >}} If the key is not in fact unique, downstream consumers may not be able to correctly interpret the data in the topic, and Kafka key compaction may incorrectly garbage collect records from the topic. {{< /warning >}}

Examples

Creating a connection

A connection describes how to connect and authenticate to an external system you want Materialize to write data to.

Once created, a connection is reusable across multiple CREATE SINK statements. For more details on creating connections, check the CREATE CONNECTION documentation page.

Broker

{{< tabs tabID="1" >}} {{< tab "SSL">}}

mzsql
CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';
CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093',
    SSL KEY = SECRET kafka_ssl_key,
    SSL CERTIFICATE = SECRET kafka_ssl_crt
);

{{< /tab >}} {{< tab "SASL">}}

mzsql
CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
);

{{< /tab >}} {{< /tabs >}}

Confluent Schema Registry

{{< tabs tabID="1" >}} {{< tab "SSL">}}

mzsql
CREATE SECRET csr_ssl_crt AS '<CSR_SSL_CRT>';
CREATE SECRET csr_ssl_key AS '<CSR_SSL_KEY>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_ssl TO CONFLUENT SCHEMA REGISTRY (
    URL 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093',
    SSL KEY = SECRET csr_ssl_key,
    SSL CERTIFICATE = SECRET csr_ssl_crt,
    USERNAME = 'foo',
    PASSWORD = SECRET csr_password
);

{{< /tab >}} {{< tab "Basic HTTP Authentication">}}

mzsql
CREATE SECRET IF NOT EXISTS csr_username AS '<CSR_USERNAME>';
CREATE SECRET IF NOT EXISTS csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_basic_http
  FOR CONFLUENT SCHEMA REGISTRY
  URL '<CONFLUENT_REGISTRY_URL>',
  USERNAME = SECRET csr_username,
  PASSWORD = SECRET csr_password;

{{< /tab >}} {{< /tabs >}}

Creating a sink

Upsert envelope

{{< tabs >}} {{< tab "Avro">}}

mzsql
CREATE SINK avro_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_avro_topic')
  KEY (key_col)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

{{< /tab >}} {{< tab "JSON">}}

mzsql
CREATE SINK json_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_json_topic')
  KEY (key_col)
  FORMAT JSON
  ENVELOPE UPSERT;

{{< /tab >}} {{< /tabs >}}

Debezium envelope

{{< tabs >}} {{< tab "Avro">}}

mzsql
CREATE SINK avro_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_avro_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE DEBEZIUM;

{{< /tab >}} {{< /tabs >}}

Topic configuration

mzsql
CREATE SINK custom_topic_sink
  IN CLUSTER my_io_cluster
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'test_avro_topic',
    TOPIC PARTITION COUNT 4,
    TOPIC REPLICATION FACTOR 2,
    TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Schema compatibility levels

mzsql
CREATE SINK compatibility_level_sink
  IN CLUSTER my_io_cluster
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'test_avro_topic',
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection (
    KEY COMPATIBILITY LEVEL 'BACKWARD',
    VALUE COMPATIBILITY LEVEL 'BACKWARD_TRANSITIVE'
  )
  ENVELOPE UPSERT;

Documentation comments

Consider the following sink, docs_sink, built on top of a relation t with several SQL comments attached.

mzsql
CREATE TABLE t (key int NOT NULL, value text NOT NULL);
COMMENT ON TABLE t IS 'SQL comment on t';
COMMENT ON COLUMN t.value IS 'SQL comment on t.value';

CREATE SINK docs_sink
FROM t
INTO KAFKA CONNECTION kafka_connection (TOPIC 'doc-commont-example')
KEY (key)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection (
    DOC ON TYPE t = 'Top-level comment for container record in both key and value schemas',
    KEY DOC ON COLUMN t.key = 'Comment on column only in key schema',
    VALUE DOC ON COLUMN t.key = 'Comment on column only in value schema'
)
ENVELOPE UPSERT;

When docs_sink is created, Materialize will publish the following Avro schemas to the Confluent Schema Registry:

  • Key schema:

    json
    {
      "type": "record",
      "name": "row",
      "doc": "Top-level comment for container record in both key and value schemas",
      "fields": [
        {
          "name": "key",
          "type": "int",
          "doc": "Comment on column only in key schema"
        }
      ]
    }
    
  • Value schema:

    json
    {
      "type": "record",
      "name": "envelope",
      "doc": "Top-level comment for container record in both key and value schemas",
      "fields": [
        {
          "name": "key",
          "type": "int",
          "doc": "Comment on column only in value schema"
        },
        {
          "name": "value",
          "type": "string",
          "doc": "SQL comment on t.value"
        }
      ]
    }
    

See Avro schema documentation for details about the rules by which Materialize attaches doc fields to records.

Custom partitioning

Suppose your Materialize deployment stores data about customers and their orders. You want to emit the order data to Kafka with upsert semantics so that only the latest state of each order is retained. However, you want the data to be partitioned by only customer ID (i.e., not order ID), so that all orders for a given customer go to the same partition.

Create a sink using the PARTITION BY option to accomplish this:

sql
CREATE SINK customer_orders
  FROM ...
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'customer-orders',
    -- The partition hash includes only the customer ID, so the partition
    -- will be assigned only based on the customer ID.
    PARTITION BY = seahash(customer_id::text)
  )
  -- The key includes both the customer ID and order ID, so Kafka's compaction
  -- will keep only the latest message for each order ID.
  KEY (customer_id, order_id)
  FORMAT JSON
  ENVELOPE UPSERT;