connect/runtime/README.md
This integration allows Kafka Connect to export metrics through the AutoMQ OpenTelemetry module, enabling unified observability across your Kafka ecosystem.
Add the following to your Kafka Connect configuration file (connect-distributed.properties or connect-standalone.properties):
# Enable OpenTelemetry MetricsReporter
metric.reporters=org.apache.kafka.connect.automq.metrics.OpenTelemetryMetricsReporter
# OpenTelemetry configuration
opentelemetry.metrics.enabled=true
opentelemetry.metrics.prefix=kafka.connect
# Optional: Filter metrics
opentelemetry.metrics.include.pattern=.*connector.*|.*task.*|.*worker.*
opentelemetry.metrics.exclude.pattern=.*jmx.*|.*debug.*
Ensure the AutoMQ telemetry is properly configured. Add these properties to your application configuration:
# Telemetry export configuration
automq.telemetry.exporter.uri=prometheus://localhost:9090
# or for OTLP: automq.telemetry.exporter.uri=otlp://localhost:4317
# Service identification
service.name=kafka-connect
service.instance.id=connect-worker-1
# Export settings
automq.telemetry.exporter.interval.ms=30000
automq.telemetry.metric.cardinality.limit=10000
Kafka Connect bundles the AutoMQ log uploader so that worker logs can be streamed to S3 together with in-cluster cleanup. The uploader uses the connect-leader election mechanism by default and requires no additional configuration.
Add the following properties to your worker configuration (ConfigMap, properties file, etc.):
# Enable S3 log upload
log.s3.enable=true
log.s3.bucket=0@s3://your-log-bucket?region=us-east-1
# Optional overrides (defaults shown)
log.s3.selector.type=connect-leader
# Provide credentials if the bucket URI does not embed them
# log.s3.access.key=...
# log.s3.secret.key=...
log.s3.node.id defaults to a hash of the pod hostname if not provided, ensuring objects are partitioned per worker.
config/connect-log4j.properties has switched connectAppender to com.automq.log.S3RollingFileAppender and specifies org.apache.kafka.connect.automq.log.ConnectS3LogConfigProvider as the config provider. As long as you enable log.s3.enable=true and configure the bucket info in the worker config, log upload will be automatically initialized with the Connect process; if not set or returns log.s3.enable=false, the uploader remains disabled.
import com.automq.opentelemetry.AutoMQTelemetryManager;
import java.util.Properties;
// Initialize AutoMQ telemetry before starting Kafka Connect
Properties telemetryProps = new Properties();
telemetryProps.setProperty("automq.telemetry.exporter.uri", "prometheus://localhost:9090");
telemetryProps.setProperty("service.name", "kafka-connect");
telemetryProps.setProperty("service.instance.id", "worker-1");
// Initialize singleton instance
AutoMQTelemetryManager.initializeInstance(telemetryProps);
// Now start Kafka Connect - it will automatically use the OpenTelemetryMetricsReporter
// When shutting down your application
AutoMQTelemetryManager.shutdownInstance();
The integration automatically converts Kafka Connect metrics to OpenTelemetry format:
kafka.connect.{group}.{metric_name}kafka.connect.connector.task.batch.size.avg → kafka.connect.connector_task_batch_size_avgKafka metric tags are converted to OpenTelemetry attributes:
connector → connectortask → taskworker-id → worker_idmetric.group, service.name, service.instance.idCommon Kafka Connect metrics that will be exported:
# Connector metrics
kafka.connect.connector.startup.attempts.total
kafka.connect.connector.startup.success.total
kafka.connect.connector.startup.failure.total
# Task metrics
kafka.connect.connector.task.batch.size.avg
kafka.connect.connector.task.batch.size.max
kafka.connect.connector.task.offset.commit.avg.time.ms
# Worker metrics
kafka.connect.worker.connector.count
kafka.connect.worker.task.count
kafka.connect.worker.connector.startup.attempts.total
| Property | Description | Default | Example |
|---|---|---|---|
opentelemetry.metrics.enabled | Enable/disable metrics export | true | false |
opentelemetry.metrics.prefix | Metric name prefix | kafka.connect | my.connect |
opentelemetry.metrics.include.pattern | Regex for included metrics | All metrics | .*connector.* |
opentelemetry.metrics.exclude.pattern | Regex for excluded metrics | None | .*jmx.* |
| Property | Description | Default |
|---|---|---|
automq.telemetry.exporter.uri | Exporter endpoint | Empty |
automq.telemetry.exporter.interval.ms | Export interval | 60000 |
automq.telemetry.metric.cardinality.limit | Max metric cardinality | 20000 |
# Connector count by worker
kafka_connect_worker_connector_count
# Task failure rate
rate(kafka_connect_connector_task_startup_failure_total[5m])
# Average batch processing time
kafka_connect_connector_task_batch_size_avg
# Connector startup success rate
rate(kafka_connect_connector_startup_success_total[5m]) /
rate(kafka_connect_connector_startup_attempts_total[5m])
Common panels to create:
Metrics not appearing
Check logs for: "AutoMQTelemetryManager is not initialized"
Solution: Ensure AutoMQTelemetryManager.initializeInstance() is called before Connect starts
High cardinality warnings
Solution: Use include/exclude patterns to filter metrics
Missing dependencies
Ensure connect-runtime depends on the opentelemetry module
Enable debug logging to troubleshoot:
log4j.logger.org.apache.kafka.connect.automq=DEBUG
log4j.logger.com.automq.opentelemetry=DEBUG
This integration works alongside:
The OpenTelemetry integration provides a unified export path while preserving existing monitoring setups.