doc/developer/design/20220404_connectors.md
This document proposes a design for connectors, a new type of catalog object that allows common configuration parameters to be shared across sources and sinks.
Many users of Materialize create families of sources and sinks that share many
configuration parameters. The current design of CREATE SOURCE and CREATE SINK make this a verbose affair. The problem is particularly acute with
Avro-formatted Kafka sources that configure authentication:
CREATE SOURCE kafka1
FROM KAFKA BROKER 'kafka:9092' TOPIC 'top1' WITH (
security_protocol = 'SASL_SSL',
sasl_mechanisms = 'PLAIN',
sasl_username = 'username',
sasl_password = 'password',
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry' WITH (
username = 'username',
password = 'password'
);
CREATE SOURCE kafka2
FROM KAFKA BROKER 'kafka:9092' TOPIC 'top2' WITH (
security_protocol = 'SASL_SSL',
sasl_mechanisms = 'PLAIN',
sasl_username = 'username',
sasl_password = 'password'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry' WITH (
username = 'username',
password = 'password'
);
These two source definition differ only in their topic specification, but must duplicate eight other configuration parameters.
With the connectors proposed in this document, the CREATE SOURCE can instead
share all the relevant configuration:
CREATE CONNECTOR kafka FOR
KAFKA BROKER 'kafka:9092' WITH (
security_protocol = 'SASL_SSL',
sasl_mechanisms = 'PLAIN',
sasl_username = 'username',
sasl_password = 'password'
);
CREATE CONNECTOR schema_registry FOR
CONFLUENT SCHEMA REGISTRY 'https://schema-registry' WITH (
username = 'username',
password = 'password'
);
CREATE SOURCE kafka1
FROM KAFKA CONNECTOR kafka TOPIC 'top1'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTOR schema_registry;
CREATE SOURCE kafka2
FROM KAFKA CONNECTOR kafka TOPIC 'top2'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTOR schema_registry;
The eventual goal is for connector create syntax to come in type specific variants and not include WITH options except in cases of client implementation specific details; however, the initial implementation will preserve existing syntax for connectors to avoid entangling separating connectors into their own objects from reworking the overall connection option syntax.
All Sources in the catalog will have a reference to their connector which will also indicate if the connector is implicit or explicit. Implicit connectors are created when a traditional CREATE SOURCE command is executed and are invisible unless the catalog is directly inspected, i.e. SHOW CREATE SOURCE for a source with an implicit connector will return the same output as if connectors did not exist. Explicit connectors are created with CREATE CONNECTOR commands and used by specifying the connector in the CREATE SOURCE statement.
experimental with support for implicit and explicit connectors using otherwise identical syntax to now (WITH options, etc) in CREATE SOURCE statements when enabled and when reading from the catalog in all casesexperimental phase evaluate restructuring the syntax to remove as many WITH options as is reasonable, limiting them to client implementation details such as librdkafka options which are not generic Kafka options.experimental status.ALTER CONNECTOR with transactional consistency in the catalog but eventual consistency for existing Sources as experimental featureCREATE which follows an ALTER will be created at the STORAGE layer with the values set in the ALTERALTER which follows a CREATE will take non-zero time to propagate to the STORAGE layerALTER CONNECTOR stable once semantics and implementation are stableThe longer term syntax for CREATE CONNECTOR is expected to look similar to these examples.
CREATE CONNECTOR <connector_name>
FOR KAFKA
BROKER <value>
SECURITY [=] <security_options>
security_options ::=
SSL (
KEY FILE [[=] <value>]
CERTIFICATE FILE [[=] <value>]
KEY PASSWORD [[=] <value>] )
| SASL (
MECHANISMS <value>
USERNAME [[=] <value> ]
PASSWORD [[=] <value> ]
PASSWORD ENV [[=] <value> ] )
| GSSAPI (
KEYTAB [=] <value>
KINIT CMD [=] <value>
RELOGIN TIME [=] <value>
PRINCIPAL [=] <value>
SERVICE [=] <value> )
CREATE CONNETOR <connector_name>
FOR CONFLUENT SCHEMA REGISTRY
<registry_url>
SECURITY [[=] <security_options>]
security_options ::=
USERNAME [=] <value>
PASSWORD [=] <value>
SSL (
CERTIFICATE FILE [[=] <value> ]
KEY FILE [[=] <value> ]
CA FILE [[=] <value> ] )
CREATE CONNECTOR <connector_name>
FOR POSTGRES
CONNECTION (
( HOST [=] <value>
PORT [=] <value> )
| HOST ADDR [[=] <value> ] )
DB [[=] <value> ]
USER [=] <value>
SSL (
MODE [[=] <value>]
CERTIFICATE FILE [[=] <value>]
KEY FILE [[=] <value>]
SNI [[=] <value>] )
CREATE CONNECTOR <connector_name>
FOR AWS
ACCESS KEY ID [[=] <value>]
SECRET ACCESS KEY [[=] <value>]
TOKEN [[=] <value>]
PROFILE [[=] <value>]
ROLE ARN [[=] <value>]
REGION [[=] <value>]