backend/docs/observability.md
The PentAGI Observability Stack provides comprehensive monitoring, logging, tracing, and metrics collection for the application. It integrates multiple technologies to provide a complete view of the application's behavior, performance, and health:
This document explains how the observability stack is designed, configured, and used by developers.
The Observability stack is built as a set of layered interfaces that integrate multiple observability technologies. It uses OpenTelemetry as the foundation for metrics, logs, and traces, with additional integrations for Langfuse (LLM-specific observability) and Go's native profiling.
flowchart TD
App[PentAGI Application] --> Observer[Observability Interface]
subgraph ObservabilityComponents
Observer --> Tracer[Tracer Interface]
Observer --> Meter[Meter Interface]
Observer --> Collector[Collector Interface]
Observer --> LangfuseInt[Langfuse Interface]
end
Tracer --> OtelTracer[OpenTelemetry Tracer]
Meter --> OtelMeter[OpenTelemetry Meter]
Collector --> Metrics[System Metrics Collection]
LangfuseInt --> LangfuseClient[Langfuse Client]
OtelTracer --> OtelCollector[OpenTelemetry Collector]
OtelMeter --> OtelCollector
LogrusHook[Logrus Hook] --> OtelCollector
OtelCollector --> VictoriaMetrics[VictoriaMetrics]
OtelCollector --> Jaeger[Jaeger]
OtelCollector --> Loki[Loki]
Profiling[Profiling Server] --> App
LangfuseClient --> LangfuseBackend[Langfuse Backend]
VictoriaMetrics --> Grafana[Grafana]
Jaeger --> Grafana
Loki --> Grafana
sequenceDiagram
participant App as PentAGI App
participant Obs as Observability
participant OTEL as OpenTelemetry
participant Lf as Langfuse
participant Backend as Observability Backend
App->>Obs: Log Message
Obs->>OTEL: Format & Forward Log
OTEL->>Backend: Export Log
App->>Obs: Create Span
Obs->>OTEL: Create & Configure Span
OTEL->>Backend: Export Span
App->>Obs: Record Metric
Obs->>OTEL: Format & Record Metric
OTEL->>Backend: Export Metric
App->>Obs: New Observation
Obs->>Lf: Create Observation
Lf->>Backend: Export Observation
App->>App: Access Profiling Endpoint
The observability stack is designed around several interfaces that abstract the underlying implementations:
// Observability is the primary interface that combines all observability features
type Observability interface {
Flush(ctx context.Context) error
Shutdown(ctx context.Context) error
Meter
Tracer
Collector
Langfuse
}
// Tracer provides span creation and management
type Tracer interface {
// NewSpan creates a new span with the given kind and component name
NewSpan(
context.Context,
oteltrace.SpanKind,
string,
...oteltrace.SpanStartOption,
) (context.Context, oteltrace.Span)
// NewSpanWithParent creates a span with explicit parent trace and span IDs
NewSpanWithParent(
context.Context,
oteltrace.SpanKind,
string,
string,
string,
...oteltrace.SpanStartOption,
) (context.Context, oteltrace.Span)
// SpanFromContext extracts the current span from context
SpanFromContext(ctx context.Context) oteltrace.Span
// SpanContextFromContext extracts span context from context
SpanContextFromContext(ctx context.Context) oteltrace.SpanContext
}
// Meter provides metric recording capabilities
type Meter interface {
// Various counter, gauge, and histogram creation methods for
// both synchronous and asynchronous metrics
NewInt64Counter(string, ...otelmetric.Int64CounterOption) (otelmetric.Int64Counter, error)
NewFloat64Counter(string, ...otelmetric.Float64CounterOption) (otelmetric.Float64Counter, error)
// ... other metric types (removed for brevity)
}
// Collector provides system metric collection
type Collector interface {
// StartProcessMetricCollect starts collecting process metrics
StartProcessMetricCollect(attrs ...attribute.KeyValue) error
// StartGoRuntimeMetricCollect starts collecting Go runtime metrics
StartGoRuntimeMetricCollect(attrs ...attribute.KeyValue) error
// StartDumperMetricCollect starts collecting metrics from a custom dumper
StartDumperMetricCollect(stats Dumper, attrs ...attribute.KeyValue) error
}
// Dumper interface for custom metric collection
type Dumper interface {
DumpStats() (map[string]float64, error)
}
// Langfuse provides LLM observability
type Langfuse interface {
// NewObservation creates a new Langfuse observation
NewObservation(
context.Context,
...langfuse.ObservationContextOption,
) (context.Context, langfuse.Observation)
}
The observability stack relies on several backend services for storing and visualizing the collected data.
The infrastructure includes the following components:
The observability stack can be deployed using Docker Compose:
# Start the observability stack
docker-compose -f docker-compose-observability.yml up -d
For detailed setup instructions, refer to the README.md file in the repository.
The observability stack is configured through environment variables in the application's configuration:
| Variable | Description | Example Value |
|---|---|---|
OTEL_HOST | OpenTelemetry collector endpoint | otel:4318 |
LANGFUSE_BASE_URL | Langfuse API base URL | http://langfuse-web:3000 |
LANGFUSE_PROJECT_ID | Langfuse project ID | cm47619l0000872mcd2dlbqwb |
LANGFUSE_PUBLIC_KEY | Langfuse public API key | pk-lf-5946031c-ae6c-4451-98d2-9882a59e1707 |
LANGFUSE_SECRET_KEY | Langfuse secret API key | sk-lf-d9035680-89dd-4950-8688-7870720bf359 |
The observability stack is initialized in the application through the InitObserver function:
// Initialize clients first
lfclient, err := obs.NewLangfuseClient(ctx, cfg)
if err != nil && !errors.Is(err, obs.ErrNotConfigured) {
log.Fatalf("Unable to create langfuse client: %v\n", err)
}
otelclient, err := obs.NewTelemetryClient(ctx, cfg)
if err != nil && !errors.Is(err, obs.ErrNotConfigured) {
log.Fatalf("Unable to create telemetry client: %v\n", err)
}
// Initialize the observer
obs.InitObserver(ctx, lfclient, otelclient, []logrus.Level{
logrus.DebugLevel,
logrus.InfoLevel,
logrus.WarnLevel,
logrus.ErrorLevel,
})
// Start metrics collection
if err := obs.Observer.StartProcessMetricCollect(); err != nil {
log.Printf("Failed to start process metric collection: %v", err)
}
if err := obs.Observer.StartGoRuntimeMetricCollect(); err != nil {
log.Printf("Failed to start Go runtime metric collection: %v", err)
}
// Start profiling if needed
go profiling.Start()
This section explains how to use the observability stack's features in application code.
The observability stack integrates deeply with logrus for logging. Logged messages are automatically associated with the current span and exported to the observability backend.
The observability package implements a logrus hook that captures log entries and incorporates them into the OpenTelemetry tracing and logging systems:
// InitObserver sets up the logrus hook
obs.InitObserver(ctx, lfclient, otelclient, []logrus.Level{
logrus.DebugLevel,
logrus.InfoLevel,
logrus.WarnLevel,
logrus.ErrorLevel,
})
The hook is implemented in the observer struct, which automatically:
Fire methodThe implementation of the logrus hook in the observer:
// Fire is a logrus hook that is fired on a new log entry.
func (obs *observer) Fire(entry *logrus.Entry) error {
// Extract context or create a new one
ctx := entry.Context
if ctx == nil {
ctx = context.Background()
}
// Get current span from context
span := oteltrace.SpanFromContext(ctx)
if !span.IsRecording() {
// Create a new span for logs without a valid span
component := "internal"
if op, ok := entry.Data["component"]; ok {
component = op.(string)
}
_, span = obs.NewSpanWithParent(
ctx,
oteltrace.SpanKindInternal,
component,
// ... span creation details
)
defer span.End()
}
// Add log as an event to the span
span.AddEvent("log", oteltrace.WithAttributes(obs.makeAttrs(entry, span)...))
// Send to OpenTelemetry log pipeline
obs.logger.Emit(ctx, obs.makeRecord(entry, span))
return nil
}
For proper trace correlation, logs should include the request context. This allows the observability system to associate logs with the correct trace and span:
// WithContext is critical for associating logs with the correct trace
logrus.WithContext(ctx).Info("Operation completed")
// Without context, logs may not be associated with the correct trace
logrus.Info("This log may not be properly correlated") // Avoid this
// Example with error and fields
logrus.WithContext(ctx).WithFields(logrus.Fields{
"user_id": userID,
"action": "login",
}).WithError(err).Error("Authentication failed")
When a log entry includes a context, the observability system will:
If a log entry does not include a context (or the context has no active span), the system will:
The observability system enriches logs with trace and span information automatically:
// Create a span
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "process-request")
defer span.End()
// All logs with this context will be associated with the span
logrus.WithContext(ctx).Info("Starting processing")
// Even errors are properly correlated and include stacktraces
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("Processing failed")
// The error is automatically added to the span
}
The observer converts logrus fields to span attributes and OpenTelemetry log records:
// A log with fields
logrus.WithContext(ctx).WithFields(logrus.Fields{
"user_id": 1234,
"request_id": "abc-123",
"duration_ms": 42,
}).Info("Request processed")
// Results in span attributes:
// - log.severity: INFO
// - log.message: Request processed
// - log.user_id: 1234
// - log.request_id: abc-123
// - log.duration_ms: 42
This integration ensures that logs, traces, and metrics all share consistent context, making it easier to correlate events across the system.
Traces provide a way to track the flow of requests through the system.
Spans should be created for significant operations in the code. Each span represents a unit of work:
// Create a new span for a significant operation
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "process-request")
defer span.End()
// Always end spans, preferably with defer
Span creation follows these principles:
Span Hierarchy: Spans created with a context containing an active span become child spans
Span Kinds: Different span kinds represent different types of operations:
SpanKindInternal: Internal operations (default)SpanKindServer: Handling incoming requestsSpanKindClient: Making outgoing requestsSpanKindProducer: Sending messagesSpanKindConsumer: Receiving messagesComponent Names: The third parameter to NewSpan identifies the component and becomes the span name
Empty Spans: Even spans without explicit attributes or events (empty spans) are valuable as they:
// Parent operation
ctx, parentSpan := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "parent-operation")
defer parentSpan.End()
// Child operation - automatically becomes a child span in the trace
ctx, childSpan := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "child-operation")
defer childSpan.End()
Context propagation is critical for maintaining trace continuity:
// Create a span in function A
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "function-a")
defer span.End()
// Pass the context to function B
resultB := functionB(ctx, param1, param2)
// Inside function B, create a child span
func functionB(ctx context.Context, param1, param2 string) Result {
// This will be a child span of the span in function A
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "function-b")
defer span.End()
// ...function logic...
}
The context carries:
Always use the updated context returned from NewSpan():
// CORRECT: Using the updated context
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "operation")
// Pass the updated ctx to subsequent operations
// INCORRECT: Not using the updated context
_, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "operation")
// Subsequent operations won't be part of the same trace
Metrics provide quantitative measurements of the application's behavior:
// Create a counter
requestCounter, _ := obs.Observer.NewInt64Counter(
"app.requests.total",
otelmetric.WithDescription("Total number of requests"),
)
// Increment the counter
requestCounter.Add(ctx, 1,
attribute.String("endpoint", "/api/users"),
attribute.String("method", "GET"),
)
// Create a histogram for measuring latencies
latencyHistogram, _ := obs.Observer.NewFloat64Histogram(
"app.request.duration",
otelmetric.WithDescription("Request duration in seconds"),
)
// Record a duration
startTime := time.Now()
// ... perform operation ...
duration := time.Since(startTime).Seconds()
latencyHistogram.Record(ctx, duration,
attribute.String("endpoint", "/api/users"),
attribute.String("method", "GET"),
)
Langfuse provides specialized observability for LLM operations and agentic workflows with automatic data conversion to OpenAI-compatible format:
// Create a new observation for an LLM operation
ctx, observation := obs.Observer.NewObservation(ctx,
langfuse.WithObservationTraceContext(
langfuse.WithTraceName("flow-execution"),
langfuse.WithTraceUserId(user.Email),
),
)
// LangChainGo messages are automatically converted to OpenAI format
messages := []*llms.MessageContent{
{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{
llms.TextContent{Text: "Analyze this vulnerability"},
},
},
}
// Create a generation for an LLM request
generation := observation.Generation(
langfuse.WithGenerationName("content-generation"),
langfuse.WithGenerationModel("gpt-4"),
langfuse.WithGenerationInput(messages), // Auto-converted to OpenAI format
)
// Complete the generation with result
output := &llms.ContentChoice{
Content: "Based on analysis...",
ToolCalls: []llms.ToolCall{...},
}
generation.End(
langfuse.WithGenerationOutput(output), // Auto-converted to OpenAI format
langfuse.WithGenerationUsage(&langfuse.GenerationUsage{
Input: promptTokens,
Output: responseTokens,
Unit: langfuse.GenerationUsageUnitTokens,
}),
)
Key Features:
Langfuse supports additional observation types for comprehensive agentic system monitoring:
Agent Observations for autonomous reasoning processes:
agent := observation.Agent(
langfuse.WithAgentName("task-executor"),
langfuse.WithAgentInput(taskDescription),
langfuse.WithAgentMetadata(langfuse.Metadata{
"agent_type": "executor",
"capabilities": []string{"code_execution", "file_operations"},
}),
)
result := executeTask(ctx, taskDescription)
agent.End(
langfuse.WithAgentOutput(result),
langfuse.WithAgentStatus("completed"),
)
Tool Observations for tracking tool executions:
tool := observation.Tool(
langfuse.WithToolName("search-tool"),
langfuse.WithToolInput(searchQuery),
)
results := performSearch(ctx, searchQuery)
tool.End(
langfuse.WithToolOutput(results),
langfuse.WithToolStatus("success"),
)
Chain Observations for multi-step reasoning:
chain := observation.Chain(
langfuse.WithChainName("reasoning-chain"),
langfuse.WithChainInput(messages),
langfuse.WithChainMetadata(langfuse.Metadata{
"steps": 3,
"model": "gpt-4",
}),
)
finalAnswer := executeChain(ctx, messages)
chain.End(
langfuse.WithChainOutput(finalAnswer),
langfuse.WithChainStatus("completed"),
)
Retriever Observations for information retrieval:
retriever := observation.Retriever(
langfuse.WithRetrieverName("vector-search"),
langfuse.WithRetrieverInput(query),
)
documents := vectorStore.Search(ctx, query)
retriever.End(
langfuse.WithRetrieverOutput(documents),
langfuse.WithRetrieverStatus("success"),
)
Evaluator Observations for quality assessment:
evaluator := observation.Evaluator(
langfuse.WithEvaluatorName("quality-check"),
langfuse.WithEvaluatorInput(response),
)
score := evaluateQuality(ctx, response)
evaluator.End(
langfuse.WithEvaluatorOutput(score),
langfuse.WithEvaluatorStatus("completed"),
)
Embedding Observations for vector generation:
embedding := observation.Embedding(
langfuse.WithEmbeddingName("text-embedding"),
langfuse.WithEmbeddingInput(text),
)
vector := generateEmbedding(ctx, text)
embedding.End(
langfuse.WithEmbeddingOutput(vector),
langfuse.WithEmbeddingStatus("success"),
)
Guardrail Observations for safety checks:
guardrail := observation.Guardrail(
langfuse.WithGuardrailName("safety-filter"),
langfuse.WithGuardrailInput(userInput),
)
passed, violations := checkSafety(ctx, userInput)
guardrail.End(
langfuse.WithGuardrailOutput(map[string]any{
"passed": passed,
"violations": violations,
}),
langfuse.WithGuardrailStatus(fmt.Sprintf("passed=%t", passed)),
)
For detailed information about Langfuse integration, data conversion, and advanced patterns, see Langfuse Integration Documentation.
The observability stack includes a profiling server that exposes Go's standard profiling endpoints:
// The profiling server starts automatically when profiling.Start() is called
// It runs on port 7777 by default
// Access profiles using:
// - CPU profile: http://localhost:7777/profiler/profile
// - Heap profile: http://localhost:7777/profiler/heap
// - Goroutine profile: http://localhost:7777/profiler/goroutine
// - Block profile: http://localhost:7777/profiler/block
// - Mutex profile: http://localhost:7777/profiler/mutex
// - Execution trace: http://localhost:7777/profiler/trace
You can use standard Go tools to collect and analyze profiles:
# Collect a 30-second CPU profile
go tool pprof http://localhost:7777/profiler/profile
# Collect a heap profile
go tool pprof http://localhost:7777/profiler/heap
# Collect a 5-second execution trace
curl -o trace.out http://localhost:7777/profiler/trace?seconds=5
go tool trace trace.out
PentAGI uses several patterns for instrumenting different parts of the application. These patterns demonstrate best practices for integrating the observability stack.
The PentAGI application uses a Gin middleware to instrument HTTP requests, located in pkg/server/logger/logger.go:
// WithGinLogger creates a middleware that logs HTTP requests with tracing
func WithGinLogger(service string) gin.HandlerFunc {
return func(c *gin.Context) {
// Record start time for duration calculation
start := time.Now()
// Extract URI and query parameters
uri := c.Request.URL.Path
raw := c.Request.URL.RawQuery
if raw != "" {
uri = uri + "?" + raw
}
// Create structured log with HTTP request details
entry := logrus.WithFields(logrus.Fields{
"component": "api",
"net_peer_ip": c.ClientIP(),
"http_uri": uri,
"http_path": c.Request.URL.Path,
"http_host_name": c.Request.Host,
"http_method": c.Request.Method,
})
// Add request type information
if c.FullPath() == "" {
entry = entry.WithField("request", "proxy handled")
} else {
entry = entry.WithField("request", "api handled")
}
// Proceed with the request
c.Next()
// Include any Gin errors
if len(c.Errors) > 0 {
entry = entry.WithField("gin.errors", c.Errors.String())
}
// Add response information and duration
entry = entry.WithFields(logrus.Fields{
"duration": time.Since(start).String(),
"http_status_code": c.Writer.Status(),
"http_resp_size": c.Writer.Size(),
}).WithContext(c.Request.Context())
// Log appropriate level based on status code
if c.Writer.Status() >= 400 {
entry.Error("http request handled error")
} else {
entry.Debug("http request handled success")
}
}
}
This middleware:
To use this middleware in your Gin application:
// Setup the router with the logging middleware
router := gin.New()
router.Use(logger.WithGinLogger("api-service"))
PentAGI also provides instrumentation for GraphQL operations:
// WithGqlLogger creates middleware that instruments GraphQL operations
func WithGqlLogger(service string) func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response {
return func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response {
// Create a span for the GraphQL operation
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindServer, "graphql.handler")
defer span.End()
// Record start time
start := time.Now()
entry := logrus.WithContext(ctx).WithField("component", service)
// Execute the GraphQL operation
res := next(ctx)
// Add operation details to the logs
op := graphql.GetOperationContext(ctx)
if op != nil && op.Operation != nil {
entry = entry.WithFields(logrus.Fields{
"operation_name": op.OperationName,
"operation_type": op.Operation.Operation,
})
}
// Add duration information
entry = entry.WithField("duration", time.Since(start).String())
// Log errors if present
if res == nil {
return res
}
if len(res.Errors) > 0 {
entry = entry.WithField("gql.errors", res.Errors.Error())
entry.Error("graphql request handled with errors")
} else {
entry.Debug("graphql request handled success")
}
return res
}
}
This middleware:
To use this middleware in your GraphQL server:
// Configure the GraphQL server with the logging middleware
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{
Resolvers: &graph.Resolver{},
}))
srv.AroundOperations(logger.WithGqlLogger("graphql-service"))
Always propagate context through your application to maintain trace continuity:
// Pass context to functions and methods
func ProcessRequest(ctx context.Context, req Request) {
// Use the context for spans, logs, etc.
logrus.WithContext(ctx).Info("Processing request")
// Pass the context to downstream functions
result, err := fetchData(ctx, req.ID)
}
Use structured logging with consistent field names:
// Define common field names
const (
FieldUserID = "user_id"
FieldRequestID = "request_id"
FieldComponent = "component"
)
// Use them consistently
logrus.WithFields(logrus.Fields{
FieldUserID: user.ID,
FieldRequestID: reqID,
FieldComponent: "auth-service",
}).Info("User authenticated")
Create spans that represent logical operations:
// Good: spans represent logical operations
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "validate-user-input")
defer span.End()
// Bad: spans are too fine-grained or too coarse
ctx, span := obs.Observer.NewSpan(ctx, obs.SpanKindInternal, "process-entire-request")
defer span.End()
Design metrics to answer specific questions:
// Good: metrics that help troubleshoot
cacheHitCounter, _ := obs.Observer.NewInt64Counter("cache.hits")
cacheMissCounter, _ := obs.Observer.NewInt64Counter("cache.misses")
// Good: metrics with dimensions
requestCounter.Add(ctx, 1,
attribute.String("status", status),
attribute.String("endpoint", endpoint),
)
When facing performance issues, you can:
Example workflow:
# 1. Look at traces in Jaeger UI to identify slow spans
# 2. Check resource metrics in Grafana
# 3. Collect and analyze profiles
go tool pprof http://localhost:7777/profiler/profile
(pprof) top10 # Show the top 10 functions by CPU usage
(pprof) web # Generate a graph visualization
For LLM-related issues:
To understand system resource usage:
# Collect a memory profile
go tool pprof http://localhost:7777/profiler/heap
(pprof) top10 # Show the top 10 functions by memory usage
(pprof) list someFunction # Show memory usage in a specific function