automq-metrics/README.md
com.automq.opentelemetry/
├── AutoMQTelemetryManager.java # Main management class for initialization and lifecycle
├── TelemetryConstants.java # Constants definition
├── common/
│ ├── OTLPCompressionType.java # OTLP compression types
│ └── OTLPProtocol.java # OTLP protocol types
├── exporter/
│ ├── MetricsExporter.java # Exporter interface
│ ├── MetricsExportConfig.java # Export configuration
│ ├── MetricsExporterProvider.java # Exporter factory provider
│ ├── MetricsExporterType.java # Exporter type enumeration
│ ├── MetricsExporterURI.java # URI parser for exporters
│ ├── OTLPMetricsExporter.java # OTLP exporter implementation
│ ├── PrometheusMetricsExporter.java # Prometheus exporter implementation
│ └── s3/ # S3 metrics exporter implementation
│ ├── CompressionUtils.java # Utility for data compression
│ ├── PrometheusUtils.java # Utilities for Prometheus format
│ ├── S3MetricsExporter.java # S3 metrics exporter implementation
│ └── S3MetricsExporterAdapter.java # Adapter to handle S3 metrics export
└── yammer/
├── DeltaHistogram.java # Delta histogram implementation
├── OTelMetricUtils.java # OpenTelemetry metrics utilities
├── YammerMetricsProcessor.java # Yammer metrics processor
└── YammerMetricsReporter.java # Yammer metrics reporter
The AutoMQ OpenTelemetry module is a telemetry data collection and export component based on OpenTelemetry SDK, specifically designed for AutoMQ Kafka. This module provides unified telemetry data management capabilities, supporting the collection of JVM metrics, JMX metrics, and Yammer metrics, and can export data to Prometheus, OTLP-compatible backend systems, or S3-compatible storage.
com.automq.opentelemetry/
├── AutoMQTelemetryManager.java # Main management class for initialization and lifecycle
├── TelemetryConfig.java # Configuration management class
├── TelemetryConstants.java # Constants definition
├── common/
│ └── MetricsUtils.java # Metrics utility class
├── exporter/
│ ├── MetricsExporter.java # Exporter interface
│ ├── MetricsExporterURI.java # URI parser
│ ���── OTLPMetricsExporter.java # OTLP exporter implementation
│ ├── PrometheusMetricsExporter.java # Prometheus exporter implementation
│ └── s3/ # S3 metrics exporter implementation
│ ├── CompressionUtils.java # Utility for data compression
│ ├── PrometheusUtils.java # Utilities for Prometheus format
│ ├── S3MetricsConfig.java # Configuration interface
│ ├── S3MetricsExporter.java # S3 metrics exporter implementation
│ ├── S3MetricsExporterAdapter.java # Adapter to handle S3 metrics export
│ ├── LeaderNodeSelector.java # Interface for node selection logic
│ └── LeaderNodeSelectors.java # Factory for node selector implementations
└── yammer/
├── DeltaHistogram.java # Delta histogram implementation
├── OTelMetricUtils.java # OpenTelemetry metrics utilities
├── YammerMetricsProcessor.java # Yammer metrics processor
└── YammerMetricsReporter.java # Yammer metrics reporter
import com.automq.opentelemetry.AutoMQTelemetryManager;
import com.automq.opentelemetry.exporter.MetricsExportConfig;
// Implement MetricsExportConfig
public class MyMetricsExportConfig implements MetricsExportConfig {
@Override
public String clusterId() { return "my-cluster"; }
@Override
public boolean isLeader() { return true; }
@Override
public int nodeId() { return 1; }
@Override
public ObjectStorage objectStorage() {
// Return your object storage instance for S3 exports
return myObjectStorage;
}
@Override
public List<Pair<String, String>> baseLabels() {
return Arrays.asList(
Pair.of("environment", "production"),
Pair.of("region", "us-east-1")
);
}
@Override
public int intervalMs() { return 60000; } // 60 seconds
}
// Create export configuration
MetricsExportConfig config = new MyMetricsExportConfig();
// Initialize telemetry manager singleton
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"prometheus://localhost:9090", // exporter URI
"automq-kafka", // service name
"broker-1", // instance ID
config // export config
);
// Start Yammer metrics reporting (optional)
MetricsRegistry yammerRegistry = // Get Kafka's Yammer registry
manager.startYammerMetricsReporter(yammerRegistry);
// Application running...
// Shutdown telemetry system
AutoMQTelemetryManager.shutdownInstance();
// Get the singleton instance
AutoMQTelemetryManager manager = AutoMQTelemetryManager.getInstance();
// Get Meter for custom metrics
Meter meter = manager.getMeter();
// Create custom metrics
LongCounter requestCounter = meter
.counterBuilder("http_requests_total")
.setDescription("Total number of HTTP requests")
.build();
requestCounter.add(1, Attributes.of(AttributeKey.stringKey("method"), "GET"));
Configuration is provided through the MetricsExportConfig interface and constructor parameters:
| Parameter | Description | Example |
|---|---|---|
exporterUri | Metrics exporter URI | prometheus://localhost:9090 |
serviceName | Service name for telemetry | automq-kafka |
instanceId | Unique service instance ID | broker-1 |
config | MetricsExportConfig implementation | See example above |
All configuration is done through the MetricsExportConfig interface and constructor parameters. Export intervals, compression settings, and other options are controlled through:
// Use prometheus:// URI scheme
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"prometheus://localhost:9090",
"automq-kafka",
"broker-1",
config
);
// Use otlp:// URI scheme with optional query parameters
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"otlp://localhost:4317?protocol=grpc&compression=gzip&timeout=30000",
"automq-kafka",
"broker-1",
config
);
// Use s3:// URI scheme
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"s3://access-key:[email protected]",
"automq-kafka",
"broker-1",
config // config.clusterId(), nodeId(), isLeader() used for S3 export
);
Example usage with S3 exporter:
// Implementation for S3 export configuration
public class S3MetricsExportConfig implements MetricsExportConfig {
private final ObjectStorage objectStorage;
public S3MetricsExportConfig(ObjectStorage objectStorage) {
this.objectStorage = objectStorage;
}
@Override
public String clusterId() { return "my-kafka-cluster"; }
@Override
public boolean isLeader() {
// Only one node in the cluster should return true
return isCurrentNodeLeader();
}
@Override
public int nodeId() { return 1; }
@Override
public ObjectStorage objectStorage() { return objectStorage; }
@Override
public List<Pair<String, String>> baseLabels() {
return Arrays.asList(Pair.of("environment", "production"));
}
@Override
public int intervalMs() { return 60000; }
}
// Initialize telemetry manager with S3 export
ObjectStorage objectStorage = // Create your object storage instance
MetricsExportConfig config = new S3MetricsExportConfig(objectStorage);
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"s3://access-key:[email protected]",
"automq-kafka",
"broker-1",
config
);
// Application running...
// Shutdown telemetry system
AutoMQTelemetryManager.shutdownInstance();
Define JMX metrics collection rules through YAML configuration files:
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
exporterUri, serviceName, instanceId, config
);
// Set JMX config paths after initialization
manager.setJmxConfigPaths("/jmx-config.yaml,/kafka-jmx.yaml");
Directory Requirements:
src/main/resources directory)/config/jmx-metrics.yamlPath Format:
/ to indicate starting from classpath rootFile Format:
.yaml or .yml extension)src/main/resources/
├── jmx-kafka-broker.yaml # Kafka Broker metrics configuration
├── jmx-kafka-consumer.yaml # Kafka Consumer metrics configuration
├── jmx-kafka-producer.yaml # Kafka Producer metrics configuration
└── config/
├── custom-jmx.yaml # Custom JMX metrics configuration
└── third-party-jmx.yaml # Third-party component JMX configuration
JMX configuration file example (jmx-config.yaml):
rules:
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
metricAttribute:
name: kafka_server_broker_topic_messages_in_per_sec
description: Messages in per second
unit: "1/s"
attributes:
- name: topic
value: topic
Through Yammer metrics bridging, supports the following types of Kafka metrics:
BytesInPerSec - Bytes input per secondBytesOutPerSec - Bytes output per secondSize - Log size (for identifying idle partitions)Support creating custom metrics through OpenTelemetry API:
public class ProductionMetricsConfig implements MetricsExportConfig {
@Override
public String clusterId() { return "production-cluster"; }
@Override
public boolean isLeader() {
// Implement your leader election logic
return isCurrentNodeController();
}
@Override
public int nodeId() { return getCurrentNodeId(); }
@Override
public ObjectStorage objectStorage() {
return productionObjectStorage;
}
@Override
public List<Pair<String, String>> baseLabels() {
return Arrays.asList(
Pair.of("environment", "production"),
Pair.of("region", System.getenv("AWS_REGION")),
Pair.of("version", getApplicationVersion())
);
}
@Override
public int intervalMs() { return 60000; } // 1 minute
}
// Initialize for production
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"prometheus://0.0.0.0:9090", // Or S3 URI for object storage export
"automq-kafka",
System.getenv("HOSTNAME"),
new ProductionMetricsConfig()
);
public class DevelopmentMetricsConfig implements MetricsExportConfig {
@Override
public String clusterId() { return "dev-cluster"; }
@Override
public boolean isLeader() { return true; } // Single node in dev
@Override
public int nodeId() { return 1; }
@Override
public ObjectStorage objectStorage() { return null; } // Not needed for OTLP
@Override
public List<Pair<String, String>> baseLabels() {
return Arrays.asList(Pair.of("environment", "development"));
}
@Override
public int intervalMs() { return 10000; } // 10 seconds for faster feedback
}
// Initialize for development
AutoMQTelemetryManager manager = AutoMQTelemetryManager.initializeInstance(
"otlp://localhost:4317",
"automq-kafka-dev",
"local-dev",
new DevelopmentMetricsConfig()
);
shutdown() method when application closes to release resourcesMetrics not exported
initializeInstance() is correctMetricsExportConfig.intervalMs() returns reasonable valueJMX metrics missing
setJmxConfigPaths() is correctHigh memory usage
MetricsExportConfigbaseLabels()intervalMs()Enable debug logging for more information using your logging framework configuration (e.g., logback.xml, log4j2.xml):
<!-- For Logback -->
<logger name="com.automq.opentelemetry" level="DEBUG" />
<logger name="io.opentelemetry" level="INFO" />
This module is open source under the Apache License 2.0.