metadata-ingestion/docs/sources/kafka-connect/kafka-connect_post.md
Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.
✅ Fully Supported:
:::warning Considerations
For connectors not listed in the supported connector table above, use the generic_connectors configuration to provide explicit lineage mappings.
Some advanced connector-specific features may not be fully supported
:::
Environment Detection: Automatically detects environment based on connect_uri patterns containing confluent.cloud.
/connectors/{name}/topics API endpoint for maximum accuracysource:
type: kafka-connect
config:
# Self-hosted Kafka Connect cluster
connect_uri: "http://localhost:8083"
# use_connect_topics_api: true # Default - enables runtime topic discovery
Recommended approach using environment and cluster IDs:
source:
type: kafka-connect
config:
# Auto-construct URI from environment and cluster IDs (recommended)
confluent_cloud_environment_id: "env-xyz123" # Your Confluent Cloud environment ID
confluent_cloud_cluster_id: "lkc-abc456" # Your Kafka Connect cluster ID
# Standard credentials for Kafka Connect API
username: "your-connect-api-key" # API key for Kafka Connect access
password: "your-connect-api-secret" # API secret for Kafka Connect access
# Optional: Separate credentials for Kafka REST API (if different from Connect API)
kafka_api_key: "your-kafka-api-key" # API key for Kafka REST API access
kafka_api_secret: "your-kafka-api-secret" # API secret for Kafka REST API access
# Optional: Dedicated Kafka REST endpoint for comprehensive topic retrieval
kafka_rest_endpoint: "https://pkc-xxxxx.region.provider.confluent.cloud"
# use_connect_topics_api: true # Default - enables comprehensive topic retrieval
Alternative approach using full URI (legacy):
source:
type: kafka-connect
config:
# Confluent Cloud Connect URI - automatically detected
connect_uri: "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-abc456"
username: "your-connect-api-key" # API key for Kafka Connect
password: "your-connect-api-secret" # API secret for Kafka Connect
kafka_api_key: "your-kafka-api-key" # API key for Kafka REST API (if different)
kafka_api_secret: "your-kafka-api-secret" # API secret for Kafka REST API (if different)
# Optional: Dedicated Kafka REST endpoint for comprehensive topic retrieval
kafka_rest_endpoint: "https://pkc-xxxxx.region.provider.confluent.cloud"
The use_connect_topics_api flag controls topic retrieval behavior:
true (default): Uses environment-specific topic discovery with full transform supportfalse: Disables all topic discovery for air-gapped environments or performance optimizationThe new reverse transform pipeline strategy handles complex scenarios automatically:
### Example: EventRouter + RegexRouter chain
transforms: EventRouter,RegexRouter
transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter
transforms.RegexRouter.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexRouter.regex: "outbox\\.event\\.(.*)"
transforms.RegexRouter.replacement: "events.$1"
generic_connectors configurationuse_connect_topics_api: false to disable topic discovery in air-gapped environments| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
|---|---|---|---|---|
| Platform JDBC Source | ||||
io.confluent.connect.jdbc.JdbcSourceConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
| Cloud PostgreSQL CDC | ||||
PostgresCdcSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
| Cloud PostgreSQL CDC V2 | ||||
PostgresCdcSourceV2 | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
| Cloud MySQL Source | ||||
MySqlSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
| Cloud MySQL CDC | ||||
MySqlCdcSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
| Debezium MySQL | ||||
io.debezium.connector.mysql.MySqlConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
| Debezium PostgreSQL | ||||
io.debezium.connector.postgresql.PostgresConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
| Debezium SQL Server | ||||
io.debezium.connector.sqlserver.SqlServerConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
| Debezium Oracle | ||||
io.debezium.connector.oracle.OracleConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
| Debezium DB2 | ||||
io.debezium.connector.db2.Db2Connector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
| Debezium MongoDB | ||||
io.debezium.connector.mongodb.MongoDbConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Collection → Topic CDC mapping |
| Debezium Vitess | ||||
io.debezium.connector.vitess.VitessConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Table → Topic CDC mapping |
| MongoDB Source | ||||
com.mongodb.kafka.connect.MongoSourceConnector | ✅ Full | 🔧 Config Required | Runtime API / Manual config | Collection → Topic mapping |
| Generic Connectors | 🔧 Config Required | 🔧 Config Required | User-defined mapping | Custom lineage mapping |
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
|---|---|---|---|---|
| BigQuery Sink | ||||
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| S3 Sink | ||||
io.confluent.connect.s3.S3SinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → S3 object mapping |
| Snowflake Sink | ||||
com.snowflake.kafka.connector.SnowflakeSinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| Cloud PostgreSQL Sink | ||||
PostgresSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| Cloud MySQL Sink | ||||
MySqlSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| Cloud Snowflake Sink | ||||
SnowflakeSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| Debezium JDBC Sink | ||||
io.debezium.connector.jdbc.JdbcSinkConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Topic → Table mapping |
| Confluent JDBC Sink | ||||
io.confluent.connect.jdbc.JdbcSinkConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Topic → Table mapping |
Legend:
generic_connectors configuration for lineage mapping:::info
On JDBC Sink connectors in Confluent Cloud:** io.debezium.connector.jdbc.JdbcSinkConnector and io.confluent.connect.jdbc.JdbcSinkConnector are not Confluent Cloud managed connectors — they can only appear as custom (self-managed) connectors deployed against a Confluent Cloud Kafka cluster. When present, DataHub supports lineage extraction for them, but with one limitation: the target platform (e.g. postgres, mysql, oracle, mssql) must be auto-detected from the connection.url field in the connector configuration. If connection.url is absent or uses an unrecognised JDBC scheme, platform detection will fail and a warning will be emitted. For Confluent Cloud managed JDBC sink connectors, use the dedicated PostgresSink or MySqlSink connector classes instead, which have explicit platform support.
:::
DataHub uses an advanced transform pipeline strategy that automatically handles complex transform chains by applying the complete pipeline to all topics and checking if results exist. This provides robust support for any combination of transforms.
org.apache.kafka.connect.transforms.RegexRouterio.confluent.connect.cloud.transforms.TopicRegexRouterio.debezium.transforms.outbox.EventRouter (Outbox pattern)DataHub recognizes but passes through these transforms (they don't affect lineage):
DataHub uses an improved reverse transform pipeline approach that:
Benefits:
Kafka Connect connectors can apply transforms (like RegexRouter) that modify topic names before data reaches Kafka. DataHub's lineage inference analyzes these transform configurations to determine how topics are produced:
table.include.list, database.include.list)This approach works for both self-hosted and Confluent Cloud environments:
/connectors/{name}/topics API for actual topics produced by each connectorKey Benefits:
Configuration Options:
confluent_cloud_environment_id and confluent_cloud_cluster_id for automatic URI constructionkafka_rest_endpoint if auto-derivation doesn't workconnect_api_key/connect_api_secret for Connect API and kafka_api_key/kafka_api_secret for Kafka REST APIusername/password for Connect API (falls back for Kafka API if separate credentials not provided)DataHub now provides intelligent topic resolution that works reliably across all environments, including Confluent Cloud where the Kafka Connect topics API is unavailable.
Source Connectors (Debezium, Snowflake CDC, JDBC):
table.include.list, database.include.list)Sink Connectors (S3, Snowflake, BigQuery, JDBC):
topics field) and regex patterns (topics.regex field)topics.regex is used:
manifest.topic_names from Kafka API (if available)use_schema_resolver enabled)Source Connector with Pattern Expansion:
# Debezium PostgreSQL source with wildcard tables
connector.config:
table.include.list: "public.analytics_.*"
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all PostgreSQL tables matching pattern
# 2. Derive expected topic names (server.schema.table format)
# 3. Apply transforms if configured
# 4. Create lineages without Kafka validation
Sink Connector with topics.regex (Confluent Cloud):
# S3 sink connector consuming from pattern-matched topics
connector.config:
topics.regex: "analytics\\..*" # Match topics like analytics.users, analytics.orders
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all Kafka topics (requires use_schema_resolver: true)
# 2. Match topics against the regex pattern
# 3. Create lineages for matched topics
Enable DataHub Topic Querying for Sink Connectors:
source:
type: kafka-connect
config:
connect_uri: "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-abc456"
username: "your-connect-api-key"
password: "your-connect-api-secret"
# Enable DataHub schema resolver for topic pattern expansion
use_schema_resolver: true # Required for topics.regex fallback
# Configure graph connection for DataHub queries
datahub_gms_url: "http://localhost:8080" # Your DataHub GMS endpoint
table.include.list and topics.regex are properly expandedDataHub will query for topics in these scenarios:
Source Connectors:
table.include.list (e.g., ANALYTICS.PUBLIC.*)Sink Connectors:
topics.regex is used AND Kafka API is unavailable (Confluent Cloud)use_schema_resolver: true in configurationImportant Notes:
The Kafka Connect source can query DataHub for schema information to provide two capabilities:
database.* into actual table names by querying DataHubBoth features require existing metadata in DataHub from your database and Kafka schema registry ingestion.
Starting with the latest version, use_schema_resolver is automatically enabled for Confluent Cloud environments to provide better defaults for enhanced lineage extraction. This gives you column-level lineage and pattern expansion out of the box!
Confluent Cloud (Auto-Enabled):
source:
type: kafka-connect
config:
# Confluent Cloud environment
confluent_cloud_environment_id: "env-xyz123"
confluent_cloud_cluster_id: "lkc-abc456"
username: "your-connect-api-key"
password: "your-connect-api-secret"
# Schema resolver automatically enabled! ✓
# use_schema_resolver: true (auto-enabled)
# schema_resolver_expand_patterns: true (auto-enabled)
# schema_resolver_finegrained_lineage: true (auto-enabled)
To disable (if you don't need these features):
source:
type: kafka-connect
config:
confluent_cloud_environment_id: "env-xyz123"
confluent_cloud_cluster_id: "lkc-abc456"
use_schema_resolver: false # Explicitly disable auto-enable
Self-hosted (Manual Enable Required):
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# Must explicitly enable for self-hosted
use_schema_resolver: true
# DataHub connection
datahub_api:
server: "http://localhost:8080"
Important Prerequisites:
⚠️ Source database tables must be ingested into DataHub BEFORE running Kafka Connect ingestion
The schema resolver queries DataHub for existing table metadata. If your source databases haven't been ingested yet, the feature will have no effect. Run database ingestion first!
Recommended Ingestion Order:
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# Enable DataHub schema querying (auto-enabled for Confluent Cloud)
use_schema_resolver: true
# Control which features to use (both default to true when schema resolver enabled)
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage
# DataHub connection (required when use_schema_resolver=true)
datahub_api:
server: "http://localhost:8080"
token: "your-datahub-token" # Optional
Converts wildcard patterns in connector configurations into actual table names by querying DataHub.
Example: MySQL Source with Wildcards
# Connector config contains pattern
connector.config:
table.include.list: "analytics.user_*" # Pattern: matches user_events, user_profiles, etc.
# DataHub config
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
# Result: DataHub queries for MySQL tables matching "analytics.user_*"
# Finds: user_events, user_profiles, user_sessions
# Creates lineage:
# mysql.analytics.user_events -> kafka.server.analytics.user_events
# mysql.analytics.user_profiles -> kafka.server.analytics.user_profiles
# mysql.analytics.user_sessions -> kafka.server.analytics.user_sessions
When to use:
database.*, schema.table_*)When to skip:
Configuration:
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true # Enable pattern expansion
# If you only want column-level lineage but NOT pattern expansion:
# schema_resolver_expand_patterns: false
Behavior without schema resolver: Patterns are treated as literal table names, resulting in potentially incorrect lineage.
Generates field-level lineage by matching column names between source tables and Kafka topics.
Example: PostgreSQL to Kafka CDC
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true
# Source table schema in DataHub:
# postgres.public.users: [user_id, email, created_at, updated_at]
# Kafka topic schema in DataHub:
# kafka.server.public.users: [user_id, email, created_at, updated_at]
# Result: Column-level lineage created:
# postgres.public.users.user_id -> kafka.server.public.users.user_id
# postgres.public.users.email -> kafka.server.public.users.email
# postgres.public.users.created_at -> kafka.server.public.users.created_at
# postgres.public.users.updated_at -> kafka.server.public.users.updated_at
Requirements:
Benefits:
ReplaceField Transform Support:
Column-level lineage respects ReplaceField transforms that filter or rename columns:
# Connector excludes specific fields
connector.config:
transforms: "removeFields"
transforms.removeFields.type: "org.apache.kafka.connect.transforms.ReplaceField$Value"
transforms.removeFields.exclude: "internal_id,temp_column"
# DataHub behavior:
# Source schema: [user_id, email, internal_id, temp_column]
# After transform: [user_id, email]
# Column lineage created only for: user_id, email
Configuration:
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true # Enable column-level lineage
# If you only want pattern expansion but NOT column-level lineage:
# schema_resolver_finegrained_lineage: false
Behavior without schema resolver:
Only dataset-level lineage is created (e.g., postgres.users -> kafka.users), without field-level detail.
source:
type: kafka-connect
config:
# Kafka Connect cluster
connect_uri: "http://localhost:8083"
cluster_name: "production-connect"
# Enable schema resolver features
use_schema_resolver: true
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage
# DataHub connection
datahub_api:
server: "http://datahub.company.com"
token: "${DATAHUB_TOKEN}"
# Platform instances (if using multiple)
platform_instance_map:
postgres: "prod-postgres"
kafka: "prod-kafka"
API Calls per Connector:
Optimization:
# Minimal configuration - no schema resolver
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# use_schema_resolver: false # Default - no DataHub queries
# Pattern expansion only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
schema_resolver_finegrained_lineage: false # Skip column lineage for faster ingestion
# Column lineage only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: false # Skip pattern expansion
schema_resolver_finegrained_lineage: true
Best Practice: Run database and Kafka schema ingestion before Kafka Connect ingestion to pre-populate DataHub with schema metadata.
"Pattern expansion found no matches for: analytics.*"
Causes:
Solutions:
platform_instance_map matches database ingestion config"SchemaResolver not available: DataHub graph connection is not available"
Causes:
datahub_api configurationSolutions:
source:
type: kafka-connect
config:
use_schema_resolver: true
datahub_api:
server: "http://localhost:8080" # Add DataHub GMS URL
token: "your-token" # Add if authentication enabled
Column-level lineage not appearing
Check:
Slow ingestion with schema resolver enabled
Profile:
Temporarily disable to compare:
use_schema_resolver: false
If you've multiple instances of kafka OR source/sink systems that are referred in your kafka-connect setup, you'd need to configure platform instance for these systems in kafka-connect recipe to generate correct lineage edges. You must have already set platform_instance in recipes of original source/sink systems. Refer the document Working with Platform Instances to understand more about this.
There are two options available to declare source/sink system's platform_instance in kafka-connect recipe. If single instance of platform is used across all kafka-connect connectors, you can use platform_instance_map to specify platform_instance to use for a platform when constructing URNs for lineage.
Example:
# Map of platform name to platform instance
platform_instance_map:
snowflake: snowflake_platform_instance
mysql: mysql_platform_instance
If multiple instances of platform are used across kafka-connect connectors, you'd need to specify platform_instance to use for platform for every connector.
# Map of platform name to platform instance per connector
connect_to_platform_map:
mysql_connector1:
mysql: mysql_instance1
mysql_connector2:
mysql: mysql_instance2
Here mysql_connector1 and mysql_connector2 are names of MySQL source connectors as defined in kafka-connect connector config.
connect_to_platform_map:
mysql_connector1:
mysql: mysql_instance1
kafka: kafka_instance1
mysql_connector2:
mysql: mysql_instance2
kafka: kafka_instance2
You can also use combination of platform_instance_map and connect_to_platform_map in your recipe. Note that, the platform_instance specified for the connector in connect_to_platform_map will always take higher precedance even if platform_instance for same platform is set in platform_instance_map.
If you do not use platform_instance in original source/sink recipes, you do not need to specify them in above configurations.
Note that, you do not need to specify platform_instance for BigQuery.
connect_to_platform_map:
bigquery_connector1:
kafka: kafka_instance1
bigquery_connector2:
kafka: kafka_instance2
Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in provided_configs section in recipe for DataHub to generate correct lineage.
# Optional mapping of provider configurations if using
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.
If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.
Disable topic discovery entirely for environments where API access is not available or not needed:
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
use_connect_topics_api: false # Disables all topic discovery API calls
Note: When use_connect_topics_api is false, topic information will not be extracted, which may impact lineage accuracy but improves performance and works in air-gapped environments.
Problem: Missing or incomplete topic information in lineage
Solutions:
Verify Environment Detection:
# Check logs for environment detection messages
# Self-hosted: "Detected self-hosted Kafka Connect - using runtime topics API"
# Confluent Cloud: "Detected Confluent Cloud - using comprehensive Kafka REST API topic retrieval"
Test API Connectivity:
# For self-hosted - test topics API
curl -X GET "http://localhost:8083/connectors/{connector-name}/topics"
# For Confluent Cloud - test Kafka REST API v3
curl -X GET "https://pkc-xxxxx.region.provider.confluent.cloud/kafka/v3/clusters/{cluster-id}/topics"
Configuration Troubleshooting:
# Enable debug logging
source:
type: kafka-connect
config:
# ... other config ...
use_connect_topics_api: true # Ensure this is enabled (default)
Self-hosted Issues:
username, password)Confluent Cloud Issues:
table.include.list, query)public.users) is maintained through transform pipelineIf topic discovery is impacting performance:
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
use_connect_topics_api: false # Disable for better performance (no topic info)