Back to Materialize

Ingest data from Self-hosted Kafka

doc/user/content/ingest-data/kafka/kafka-self-hosted.md

1234.7 KB
Original Source

This guide goes through the required steps to connect Materialize to a self-hosted Kafka cluster.

{{< tip >}} {{< guided-tour-blurb-for-ingest-data >}} {{< /tip >}}

Before you begin

Before you begin, you must have:

  • A Kafka cluster running Kafka 3.2 or later.
  • A client machine that can interact with your cluster.

Configure network security

There are various ways to configure your Kafka network to allow Materialize to connect:

  • Use AWS PrivateLink: If your Kafka cluster is running on AWS, you can use AWS PrivateLink to connect Materialize to the cluster.

  • Use an SSH tunnel: If your Kafka cluster is running in a private network, you can use an SSH tunnel to connect Materialize to the cluster.

  • Allow Materialize IPs: If your Kafka cluster is publicly accessible, you can configure your firewall to allow connections from a set of static Materialize IP addresses.

Select the option that works best for you.

{{< tabs >}}

{{< tab "Cloud" >}} {{< tabs tabID="1" >}}

{{< tab "Privatelink">}}

{{< note >}} Materialize provides Terraform modules for both Amazon MSK clusters and self-managed Kafka clusters which can be used to create the target groups for each Kafka broker (step 1), the network load balancer (step 2), the TCP listeners (step 3) and the VPC endpoint service (step 5).

{{< /note >}}

{{% network-security/privatelink-kafka %}}

{{< /tab >}}

{{< tab "SSH Tunnel">}}

{{% network-security/ssh-tunnel %}}

  1. In Materialize, create a source connection that uses the SSH tunnel connection you configured in the previous section:
mzsql
CREATE CONNECTION kafka_connection TO KAFKA (
  BROKER 'broker1:9092',
  SSH TUNNEL ssh_connection
);

{{< /tab >}}

{{< tab "Allow Materialize IPs">}}

  1. In the SQL Shell, or your preferred SQL client connected to Materialize, find the static egress IP addresses for the Materialize region you are running in:

    mzsql
    SELECT * FROM mz_egress_ips;
    
  2. Update your Kafka cluster firewall rules to allow traffic from each IP address from the previous step.

  3. Create a Kafka connection that references your Kafka cluster:

    mzsql
    CREATE SECRET kafka_password AS '<your-password>';
    
    CREATE CONNECTION kafka_connection TO KAFKA (
        BROKER '<broker-url>',
        SASL MECHANISMS = 'SCRAM-SHA-512',
        SASL USERNAME = '<your-username>',
        SASL PASSWORD = SECRET kafka_password
    );
    

{{< /tab >}}

{{< /tabs >}} {{< /tab >}} {{< tab "Self-Managed" >}}

There are various ways to configure your Kafka network to allow Materialize to connect:

  • Use an SSH tunnel: If your Kafka cluster is running in a private network, you can use an SSH tunnel to connect Materialize to the cluster.

  • Allow Materialize IPs: If your Kafka cluster is publicly accessible, you can configure your firewall to allow connections from a set of static Materialize IP addresses.

Select the option that works best for you.

{{< tabs >}}

{{< tab "SSH Tunnel">}}

{{% network-security/ssh-tunnel-sm %}}

  1. In Materialize, create a source connection that uses the SSH tunnel connection you configured in the previous section:
mzsql
CREATE CONNECTION kafka_connection TO KAFKA (
  BROKER 'broker1:9092',
  SSH TUNNEL ssh_connection
);

{{< /tab >}}

{{< tab "Allow Materialize IPs">}}

  1. Update your Kafka cluster firewall rules to allow traffic from Materialize.

  2. Create a Kafka connection that references your Kafka cluster:

    mzsql
    CREATE SECRET kafka_password AS '<your-password>';
    
    CREATE CONNECTION kafka_connection TO KAFKA (
        BROKER '<broker-url>',
        SASL MECHANISMS = 'SCRAM-SHA-512',
        SASL USERNAME = '<your-username>',
        SASL PASSWORD = SECRET kafka_password
    );
    

{{< /tab >}}

{{< /tabs >}} {{< /tab >}} {{< /tabs >}}

Creating a source

The Kafka connection created in the previous section can then be reused across multiple CREATE SOURCE statements:

mzsql
CREATE SOURCE json_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT JSON;

By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.