v2/benchmark/docs/real_metrics_collection.md
The real metrics collection system provides accurate, real-time performance monitoring for claude-flow benchmarks. Unlike simulated metrics, this system captures actual execution data including:
claude-flow command
↓
ProcessTracker (subprocess monitoring)
↓
PerformanceCollector + ResourceMonitor (real-time sampling)
↓
MetricsAggregator (aggregation & analysis)
↓
BenchmarkMetrics (final results)
Run benchmarks with real metrics collection:
swarm-benchmark run "Your objective" --real-metrics
from swarm_benchmark.core.real_benchmark_engine import RealBenchmarkEngine
from swarm_benchmark.core.models import BenchmarkConfig
config = BenchmarkConfig(
name="my-benchmark",
strategy=StrategyType.AUTO,
mode=CoordinationMode.CENTRALIZED
)
engine = RealBenchmarkEngine(config)
result = await engine.run_benchmark("Build a REST API")
# Access metrics
metrics = result['metrics']
print(f"Peak memory: {metrics['peak_memory_mb']} MB")
print(f"Success rate: {metrics['success_rate']}")
The system samples metrics at configurable intervals (default 100ms):
collector = PerformanceCollector(sample_interval=0.05) # 50ms sampling
Set thresholds and receive alerts when exceeded:
monitor = ResourceMonitor(alert_callback=handle_alert)
monitor.set_thresholds({
"cpu_percent": 80.0,
"memory_mb": 1024.0
})
Track all claude-flow subprocess executions:
tracker = ProcessTracker()
result = await tracker.execute_command_async(
["swarm", "Build API", "--parallel"],
timeout=300
)
Aggregate metrics from multiple sources:
aggregator = MetricsAggregator()
aggregator.start_collection()
# Create named collectors
perf1 = aggregator.create_performance_collector("agent1")
res1 = aggregator.create_resource_monitor("agent1")
# ... execute tasks ...
# Get aggregated results
metrics = aggregator.stop_collection()
Saved as metrics_{benchmark_id}.json:
{
"summary": {
"wall_clock_time": 45.23,
"tasks_per_second": 2.21,
"success_rate": 0.95,
"peak_memory_mb": 256.4,
"average_cpu_percent": 65.3
},
"performance": {
"average_duration": 0.45,
"median_duration": 0.42,
"p95_duration": 0.68,
"p99_duration": 0.89
},
"resources": {
"memory": {
"peak_mb": 256.4,
"average_mb": 185.2
},
"cpu": {
"average_percent": 65.3,
"total_seconds": 29.5
}
}
}
Saved as process_report_{benchmark_id}.json:
{
"summary": {
"total_executions": 10,
"successful_executions": 9,
"failed_executions": 1,
"overall_success_rate": 0.9,
"average_duration": 4.52
},
"command_statistics": {
"swarm:research": {
"execution_count": 5,
"success_rate": 1.0,
"average_duration": 3.2,
"peak_memory_mb": 128.5
}
}
}
Extend the base collectors for custom metrics:
class CustomCollector(PerformanceCollector):
def _collect_custom_metrics(self):
# Add custom metric collection
pass
Process multiple tasks with shared metrics:
results = await engine.execute_batch(tasks)
Compare metrics across benchmark runs:
# Load historical metrics
history = load_metrics_history("./reports")
trends = analyze_performance_trends(history)
The metrics collection system is designed for minimal overhead:
For production benchmarks:
Enable detailed logging:
import logging
logging.basicConfig(level=logging.DEBUG)
See /benchmark/examples/real_metrics_demo.py for comprehensive examples.