docs/en/interfaces/formats/Avro/AvroConfluent.md
import DataTypesMatching from './_snippets/data-types-matching.md'
| Input | Output | Alias |
|---|---|---|
| ✔ | ✗ |
Apache Avro is a row-oriented serialization format that uses binary encoding for efficient data processing. The AvroConfluent format supports decoding single-object, Avro-encoded Kafka messages serialized using the Confluent Schema Registry (or API-compatible services).
Each Avro message embeds a schema ID that ClickHouse automatically resolves by querying the configured schema registry. Once resolved, schemas are cached for optimal performance.
<a id="data-types-matching"></a>
| Setting | Description | Default |
|---|---|---|
input_format_avro_allow_missing_fields | Whether to use a default value instead of throwing an error when a field is not found in the schema. | 0 |
input_format_avro_null_as_default | Whether to use a default value instead of throwing an error when inserting a null value into a non-nullable column. | 0 |
format_avro_schema_registry_url | The Confluent Schema Registry URL. For basic authentication, URL-encoded credentials can be included directly in the URL path. |
To read an Avro-encoded Kafka topic using the Kafka table engine, use the format_avro_schema_registry_url setting to provide the URL of the schema registry.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
If your schema registry requires basic authentication (e.g., if you're using Confluent Cloud), you can provide URL-encoded credentials in the format_avro_schema_registry_url setting.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
To monitor ingestion progress and debug errors with the Kafka consumer, you can query the system.kafka_consumers system table. If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the clusterAllReplicas table function.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
If you run into schema resolution issues, you can use kafkacat with clickhouse-local to troubleshoot:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c