enterprise/doc/design-doc/openhands-enterprise-telemetry-design.md
OpenHands Enterprise (OHE) helm charts are publicly available but not open source, creating a visibility gap for the sales team. Unknown users can install and use OHE without the vendor's knowledge, preventing proper customer engagement and sales pipeline management. Without usage telemetry, the vendor cannot identify potential customers, track installation health, or proactively support users who may need assistance.
We propose implementing a comprehensive telemetry service that leverages the Replicated metrics platform and Python SDK to track OHE installations and usage. The solution provides automatic customer discovery, instance monitoring, and usage metrics collection while maintaining a clear license compliance pathway.
The system consists of three main components: (1) a pluggable metrics collection framework that allows developers to easily define and register custom metrics collectors, (2) automated cronjobs that periodically collect metrics and upload them to Replicated's vendor portal, and (3) a license compliance warning system that displays UI notifications when telemetry uploads fail, indicating potential license expiration.
The design ensures that telemetry cannot be easily disabled without breaking core OHE functionality by tying the warning system to environment variables that are essential for OHE operation. This approach balances user transparency with business requirements for customer visibility.
When telemetry uploads fail for more than 4 days, users will see a prominent warning banner in the OpenHands Enterprise UI:
⚠️ Your OpenHands Enterprise license will expire in 30 days. Please contact support if this issue persists.
The banner appears at the top of all pages and cannot be permanently dismissed while the condition persists. Users can temporarily dismiss it, but it will reappear on page refresh until telemetry uploads resume successfully.
System administrators will not need to configure the telemetry system manually. The service automatically:
Detects OHE installations using existing required environment variables (GITHUB_APP_CLIENT_ID, KEYCLOAK_SERVER_URL, etc.)
Generates unique customer identifiers using administrator contact information:
OPENHANDS_ADMIN_EMAIL environment variable (if set in helm values)accepted_tos timestamp)Collects and uploads metrics transparently in the background via weekly collection and daily upload cronjobs
Displays warnings only when necessary for license compliance - no notifications appear during normal operation
The Replicated platform provides vendor-hosted infrastructure for collecting customer and instance telemetry. The Python SDK handles authentication, state management, and reliable metric delivery. Key concepts:
The SDK automatically handles machine fingerprinting, local state caching, and retry logic for failed uploads.
To identify the appropriate administrator contact for sales outreach, the system uses a three-tier approach that avoids performance penalties on user authentication:
Tier 1: Explicit Configuration - The OPENHANDS_ADMIN_EMAIL environment variable allows administrators to explicitly specify the contact email during deployment.
Tier 2: First Active User Detection - If no explicit email is configured, the system identifies the first user who accepted Terms of Service (earliest accepted_tos timestamp with a valid email). This represents the first person to actively engage with the system and is very likely the administrator or installer.
No Fallback Needed - If neither email source is available, telemetry collection is skipped entirely. This ensures we only report meaningful usage data when there are actual active users.
Performance Optimization: The admin email determination is performed only during telemetry upload attempts, ensuring zero performance impact on user login flows.
The proposed collector framework allows developers to define metrics in a single file change:
@register_collector("user_activity")
class UserActivityCollector(MetricsCollector):
def collect(self) -> Dict[str, Any]:
# Query database and return metrics
return {"active_users_7d": count, "conversations_created": total}
Collectors are automatically discovered and executed by the collection cronjob, making the system extensible without modifying core collection logic.
Stores collected metrics with transmission status tracking:
CREATE TABLE telemetry_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
collected_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
metrics_data JSONB NOT NULL,
uploaded_at TIMESTAMP WITH TIME ZONE NULL,
upload_attempts INTEGER DEFAULT 0,
last_upload_error TEXT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_telemetry_metrics_collected_at ON telemetry_metrics(collected_at);
CREATE INDEX idx_telemetry_metrics_uploaded_at ON telemetry_metrics(uploaded_at);
Stores persistent identity information that must survive container restarts:
CREATE TABLE telemetry_identity (
id INTEGER PRIMARY KEY DEFAULT 1,
customer_id VARCHAR(255) NULL,
instance_id VARCHAR(255) NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT single_identity_row CHECK (id = 1)
);
Design Rationale:
MAX(uploaded_at) for last successful upload)from abc import ABC, abstractmethod
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class MetricResult:
key: str
value: Any
class MetricsCollector(ABC):
"""Base class for metrics collectors."""
@abstractmethod
async def collect(self) -> List[MetricResult]:
"""Collect metrics and return results."""
pass
@property
@abstractmethod
def collector_name(self) -> str:
"""Unique name for this collector."""
pass
def should_collect(self) -> bool:
"""Override to add collection conditions."""
return True
from typing import Dict, Type, List
import importlib
import pkgutil
class CollectorRegistry:
"""Registry for metrics collectors."""
def __init__(self):
self._collectors: Dict[str, Type[MetricsCollector]] = {}
def register(self, collector_class: Type[MetricsCollector]) -> None:
"""Register a collector class."""
collector = collector_class()
self._collectors[collector.collector_name] = collector_class
def get_all_collectors(self) -> List[MetricsCollector]:
"""Get instances of all registered collectors."""
return [cls() for cls in self._collectors.values()]
def discover_collectors(self, package_path: str) -> None:
"""Auto-discover collectors in a package."""
# Implementation to scan for @register_collector decorators
pass
# Global registry instance
collector_registry = CollectorRegistry()
def register_collector(name: str):
"""Decorator to register a collector."""
def decorator(cls: Type[MetricsCollector]) -> Type[MetricsCollector]:
collector_registry.register(cls)
return cls
return decorator
@register_collector("system_metrics")
class SystemMetricsCollector(MetricsCollector):
"""Collects basic system and usage metrics."""
@property
def collector_name(self) -> str:
return "system_metrics"
async def collect(self) -> List[MetricResult]:
results = []
# Collect user count
async with a_session_maker() as session:
user_count_result = await session.execute(select(func.count()).select_from(UserSettings))
user_count = user_count_result.scalar()
results.append(MetricResult(
key="total_users",
value=user_count
))
# Collect conversation count (last 30 days)
thirty_days_ago = datetime.now(timezone.utc) - timedelta(days=30)
conversation_count_result = await session.execute(
select(func.count()).select_from(StoredConversationMetadata)
.where(StoredConversationMetadata.created_at >= thirty_days_ago)
)
conversation_count = conversation_count_result.scalar()
results.append(MetricResult(
key="conversations_30d",
value=conversation_count
))
return results
class TelemetryCollectionProcessor(MaintenanceTaskProcessor):
"""Maintenance task processor for collecting metrics."""
collection_interval_days: int = 7
async def __call__(self, task: MaintenanceTask) -> dict:
"""Collect metrics from all registered collectors."""
# Check if collection is needed
if not await self._should_collect():
return {"status": "skipped", "reason": "too_recent"}
# Collect metrics from all registered collectors
all_metrics = {}
collector_results = {}
for collector in collector_registry.get_all_collectors():
try:
if collector.should_collect():
results = await collector.collect()
for result in results:
all_metrics[result.key] = result.value
collector_results[collector.collector_name] = len(results)
except Exception as e:
logger.error(f"Collector {collector.collector_name} failed: {e}")
collector_results[collector.collector_name] = f"error: {e}"
# Store metrics in database
async with a_session_maker() as session:
telemetry_record = TelemetryMetrics(
metrics_data=all_metrics,
collected_at=datetime.now(timezone.utc)
)
session.add(telemetry_record)
await session.commit()
# Note: No need to track last_collection_at separately
# Can be derived from MAX(collected_at) in telemetry_metrics
return {
"status": "completed",
"metrics_collected": len(all_metrics),
"collectors_run": collector_results
}
async def _should_collect(self) -> bool:
"""Check if collection is needed based on interval."""
async with a_session_maker() as session:
# Get last collection time from metrics table
result = await session.execute(select(func.max(TelemetryMetrics.collected_at)))
last_collected = result.scalar()
if not last_collected:
return True
time_since_last = datetime.now(timezone.utc) - last_collected
return time_since_last.days >= self.collection_interval_days
from replicated import AsyncReplicatedClient, InstanceStatus
class TelemetryUploadProcessor(MaintenanceTaskProcessor):
"""Maintenance task processor for uploading metrics to Replicated."""
replicated_publishable_key: str
replicated_app_slug: str
async def __call__(self, task: MaintenanceTask) -> dict:
"""Upload pending metrics to Replicated."""
# Get pending metrics
async with a_session_maker() as session:
result = await session.execute(
select(TelemetryMetrics)
.where(TelemetryMetrics.uploaded_at.is_(None))
.order_by(TelemetryMetrics.collected_at)
)
pending_metrics = result.scalars().all()
if not pending_metrics:
return {"status": "no_pending_metrics"}
# Get admin email - skip if not available
admin_email = await self._get_admin_email()
if not admin_email:
logger.info("Skipping telemetry upload - no admin email available")
return {
"status": "skipped",
"reason": "no_admin_email",
"total_processed": 0
}
uploaded_count = 0
failed_count = 0
async with AsyncReplicatedClient(
publishable_key=self.replicated_publishable_key,
app_slug=self.replicated_app_slug
) as client:
# Get or create customer and instance
customer = await client.customer.get_or_create(
email_address=admin_email
)
instance = await customer.get_or_create_instance()
# Store customer/instance IDs for future use
await self._update_telemetry_identity(customer.customer_id, instance.instance_id)
# Upload each metric batch
for metric_record in pending_metrics:
try:
# Send individual metrics
for key, value in metric_record.metrics_data.items():
await instance.send_metric(key, value)
# Update instance status
await instance.set_status(InstanceStatus.RUNNING)
# Mark as uploaded
async with a_session_maker() as session:
result = await session.execute(
select(TelemetryMetrics)
.where(TelemetryMetrics.id == metric_record.id)
)
record = result.scalar_one_or_none()
if record:
record.uploaded_at = datetime.now(timezone.utc)
await session.commit()
uploaded_count += 1
except Exception as e:
logger.error(f"Failed to upload metrics {metric_record.id}: {e}")
# Update error info
async with a_session_maker() as session:
result = await session.execute(
select(TelemetryMetrics)
.where(TelemetryMetrics.id == metric_record.id)
)
record = result.scalar_one_or_none()
if record:
record.upload_attempts += 1
record.last_upload_error = str(e)
await session.commit()
failed_count += 1
# Note: No need to track last_successful_upload_at separately
# Can be derived from MAX(uploaded_at) in telemetry_metrics
return {
"status": "completed",
"uploaded": uploaded_count,
"failed": failed_count,
"total_processed": len(pending_metrics)
}
async def _get_admin_email(self) -> str | None:
"""Get administrator email for customer identification."""
# 1. Check environment variable first
env_admin_email = os.getenv('OPENHANDS_ADMIN_EMAIL')
if env_admin_email:
logger.info("Using admin email from environment variable")
return env_admin_email
# 2. Use first active user's email (earliest accepted_tos)
async with a_session_maker() as session:
result = await session.execute(
select(UserSettings)
.where(UserSettings.email.isnot(None))
.where(UserSettings.accepted_tos.isnot(None))
.order_by(UserSettings.accepted_tos.asc())
.limit(1)
)
first_user = result.scalar_one_or_none()
if first_user and first_user.email:
logger.info(f"Using first active user email: {first_user.email}")
return first_user.email
# No admin email available - skip telemetry
logger.info("No admin email available - skipping telemetry collection")
return None
async def _update_telemetry_identity(self, customer_id: str, instance_id: str) -> None:
"""Update or create telemetry identity record."""
async with a_session_maker() as session:
result = await session.execute(select(TelemetryIdentity).limit(1))
identity = result.scalar_one_or_none()
if not identity:
identity = TelemetryIdentity()
session.add(identity)
identity.customer_id = customer_id
identity.instance_id = instance_id
await session.commit()
from fastapi import APIRouter
from datetime import datetime, timezone, timedelta
license_router = APIRouter()
@license_router.get("/license-status")
async def get_license_status():
"""Get license warning status for UI display."""
# Only show warnings for OHE installations
if not _is_openhands_enterprise():
return {"warn": False, "message": ""}
async with a_session_maker() as session:
# Get last successful upload time from metrics table
result = await session.execute(
select(func.max(TelemetryMetrics.uploaded_at))
.where(TelemetryMetrics.uploaded_at.isnot(None))
)
last_upload = result.scalar()
if not last_upload:
# No successful uploads yet - show warning after 4 days
return {
"warn": True,
"message": "OpenHands Enterprise license verification pending. Please ensure network connectivity."
}
# Check if last successful upload was more than 4 days ago
days_since_upload = (datetime.now(timezone.utc) - last_upload).days
if days_since_upload > 4:
# Find oldest unsent batch
result = await session.execute(
select(TelemetryMetrics)
.where(TelemetryMetrics.uploaded_at.is_(None))
.order_by(TelemetryMetrics.collected_at)
.limit(1)
)
oldest_unsent = result.scalar_one_or_none()
if oldest_unsent:
# Calculate expiration date (oldest unsent + 34 days)
expiration_date = oldest_unsent.collected_at + timedelta(days=34)
days_until_expiration = (expiration_date - datetime.now(timezone.utc)).days
if days_until_expiration <= 0:
message = "Your OpenHands Enterprise license has expired. Please contact support immediately."
else:
message = f"Your OpenHands Enterprise license will expire in {days_until_expiration} days. Please contact support if this issue persists."
return {"warn": True, "message": message}
return {"warn": False, "message": ""}
def _is_openhands_enterprise() -> bool:
"""Detect if this is an OHE installation."""
# Check for required OHE environment variables
required_vars = [
'GITHUB_APP_CLIENT_ID',
'KEYCLOAK_SERVER_URL',
'KEYCLOAK_REALM_NAME'
]
return all(os.getenv(var) for var in required_vars)
The frontend will poll the license status endpoint and display warnings using the existing banner component pattern:
// New component: LicenseWarningBanner.tsx
interface LicenseStatus {
warn: boolean;
message: string;
}
export function LicenseWarningBanner() {
const [licenseStatus, setLicenseStatus] = useState<LicenseStatus>({ warn: false, message: "" });
useEffect(() => {
const checkLicenseStatus = async () => {
try {
const response = await fetch('/api/license-status');
const status = await response.json();
setLicenseStatus(status);
} catch (error) {
console.error('Failed to check license status:', error);
}
};
// Check immediately and then every hour
checkLicenseStatus();
const interval = setInterval(checkLicenseStatus, 60 * 60 * 1000);
return () => clearInterval(interval);
}, []);
if (!licenseStatus.warn) {
return null;
}
return (
<div className="bg-red-600 text-white p-4 rounded flex items-center justify-between">
<div className="flex items-center">
<FaExclamationTriangle className="mr-3" />
<span>{licenseStatus.message}</span>
</div>
</div>
);
}
The cronjob configurations will be deployed via the OpenHands-Cloud helm charts.
The collection cronjob runs weekly to gather metrics:
# charts/openhands/templates/telemetry-collection-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ include "openhands.fullname" . }}-telemetry-collection
labels:
{{- include "openhands.labels" . | nindent 4 }}
spec:
schedule: "0 2 * * 0" # Weekly on Sunday at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: telemetry-collector
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
env:
{{- include "openhands.env" . | nindent 12 }}
command:
- python
- -c
- |
import asyncio
from enterprise.storage.maintenance_task import MaintenanceTask, MaintenanceTaskStatus
from enterprise.storage.database import a_session_maker
from enterprise.server.telemetry.collection_processor import TelemetryCollectionProcessor
async def main():
# Create collection task
processor = TelemetryCollectionProcessor()
task = MaintenanceTask()
task.set_processor(processor)
task.status = MaintenanceTaskStatus.PENDING
async with a_session_maker() as session:
session.add(task)
await session.commit()
asyncio.run(main())
restartPolicy: OnFailure
The upload cronjob runs daily to send metrics to Replicated:
# charts/openhands/templates/telemetry-upload-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ include "openhands.fullname" . }}-telemetry-upload
labels:
{{- include "openhands.labels" . | nindent 4 }}
spec:
schedule: "0 3 * * *" # Daily at 3 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: telemetry-uploader
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
env:
{{- include "openhands.env" . | nindent 12 }}
- name: REPLICATED_PUBLISHABLE_KEY
valueFrom:
secretKeyRef:
name: {{ include "openhands.fullname" . }}-replicated-config
key: publishable-key
- name: REPLICATED_APP_SLUG
value: {{ .Values.telemetry.replicatedAppSlug | default "openhands-enterprise" | quote }}
command:
- python
- -c
- |
import asyncio
from enterprise.storage.maintenance_task import MaintenanceTask, MaintenanceTaskStatus
from enterprise.storage.database import a_session_maker
from enterprise.server.telemetry.upload_processor import TelemetryUploadProcessor
import os
async def main():
# Create upload task
processor = TelemetryUploadProcessor(
replicated_publishable_key=os.getenv('REPLICATED_PUBLISHABLE_KEY'),
replicated_app_slug=os.getenv('REPLICATED_APP_SLUG', 'openhands-enterprise')
)
task = MaintenanceTask()
task.set_processor(processor)
task.status = MaintenanceTaskStatus.PENDING
async with a_session_maker() as session:
session.add(task)
await session.commit()
asyncio.run(main())
restartPolicy: OnFailure
All implementation must pass existing lints and tests. New functionality requires comprehensive unit tests with >90% coverage. Integration tests should verify end-to-end telemetry flow including collection, storage, upload, and warning display.
Repository: OpenHands Establish the foundational database schema and SQLAlchemy models for telemetry data storage.
enterprise/migrations/versions/077_create_telemetry_tables.pyenterprise/storage/telemetry_metrics.pyenterprise/storage/telemetry_config.pyenterprise/tests/unit/storage/test_telemetry_metrics.pyenterprise/tests/unit/storage/test_telemetry_config.pyDemo: Database tables created and models can store/retrieve telemetry data.
Repository: OpenHands Implement the pluggable metrics collection system with registry and base classes.
enterprise/server/telemetry/__init__.pyenterprise/server/telemetry/collector_base.pyenterprise/server/telemetry/collector_registry.pyenterprise/server/telemetry/decorators.pyenterprise/server/telemetry/collectors/__init__.pyenterprise/server/telemetry/collectors/system_metrics.pyenterprise/server/telemetry/collectors/user_activity.pyenterprise/tests/unit/telemetry/test_collector_base.pyenterprise/tests/unit/telemetry/test_collector_registry.pyenterprise/tests/unit/telemetry/test_system_metrics.pyDemo: Developers can create new collectors with a single file change using the @register_collector decorator.
Repository: OpenHands Implement maintenance task processors for collecting metrics and uploading to Replicated.
enterprise/server/telemetry/collection_processor.pyenterprise/tests/unit/telemetry/test_collection_processor.pyenterprise/server/telemetry/upload_processor.pyenterprise/tests/unit/telemetry/test_upload_processor.pyenterprise/tests/integration/test_telemetry_flow.pyDemo: Metrics are automatically collected weekly and uploaded daily to Replicated vendor portal.
Repository: OpenHands Implement the license status endpoint for the warning system.
enterprise/server/routes/license.pyenterprise/tests/unit/routes/test_license.pyenterprise/saas_server.py to include license routerDemo: License status API returns warning status based on telemetry upload success.
Repository: OpenHands Implement the frontend warning banner component and integration.
frontend/src/components/features/license/license-warning-banner.tsxfrontend/src/components/features/license/license-warning-banner.test.tsxDemo: License warnings appear in UI when telemetry uploads fail for >4 days, with accurate expiration countdown.
Repository: OpenHands-Cloud Create Kubernetes cronjob configurations and deployment scripts.
charts/openhands/templates/telemetry-collection-cronjob.yamlcharts/openhands/templates/telemetry-upload-cronjob.yamlcharts/openhands/templates/replicated-secret.yamlcharts/openhands/values.yaml with telemetry configuration options:
# Add to values.yaml
telemetry:
enabled: true
replicatedAppSlug: "openhands-enterprise"
adminEmail: "" # Optional: admin email for customer identification
# Add to deployment environment variables
env:
OPENHANDS_ADMIN_EMAIL: "{{ .Values.telemetry.adminEmail }}"
Demo: Complete telemetry system deployed via helm chart with configurable collection intervals and Replicated integration.
Repository: OpenHands Add comprehensive metrics collectors, monitoring capabilities, and documentation.
enterprise/server/telemetry/collectors/conversation_metrics.pyenterprise/server/telemetry/collectors/integration_usage.pyenterprise/server/telemetry/collectors/performance_metrics.pyenterprise/server/telemetry/monitoring.pyenterprise/tests/e2e/test_telemetry_system.pyenterprise/server/telemetry/README.mdDemo: Rich telemetry data flowing to vendor portal with comprehensive monitoring, alerting for system health, and complete documentation.