workhorse/internal/ai_assist/duoworkflow/README.md
This package implements Workhorse's support for AI-assisted features through integration with the Duo Workflow Service. It provides WebSocket proxying, gRPC communication, and action handling for AI workflows.
The duoworkflow package enables GitLab's AI-assisted features (Duo Chat, Duo Agent) by:
The Handler manages WebSocket connections and provides graceful shutdown:
type Handler struct {
rails *api.API
rdb *redis.Client
backend http.Handler
upgrader websocket.Upgrader
runners sync.Map // map[*runner]bool
}
Key responsibilities:
runner instances for each connectionUsage:
handler := NewHandler(rails, rdb, backend)
http.Handle("/ai/duoworkflow", handler.Build())
The Client manages gRPC communication with the Duo Workflow Service:
type Client struct {
grpcConn *grpc.ClientConn
grpcClient pb.DuoWorkflowClient
headers map[string]string
}
Key features:
Configuration:
The runner orchestrates a single workflow execution:
type runner struct {
rails *api.API
backend http.Handler
token string
originalReq *http.Request
conn websocketConn
streamManager *streamManager
lockManager *workflowLockManager
workflowID string
mutex *redsync.Mutex
lockFlow bool
serverCapabilities []string
mcpManager mcpManager
workflowDefinition string
}
Responsibilities:
Message handling flow:
The streamManager manages gRPC streams to the Duo Workflow Service:
type streamManager struct {
wf workflowStream
client *Client
cloudServiceClient *Client
cloudServiceStream selfHostedWorkflowStream
originalReq *http.Request
sendMu sync.Mutex
}
Responsibilities:
ExecuteWorkflow gRPC streamTrackSelfHostedExecuteWorkflow stream for self-hosted deploymentsSend to allow concurrent goroutines to write safelyio.EOF and quota-exceeded gRPC errors into sentinel errors on RecvThe runHTTPActionHandler executes HTTP requests to the GitLab API:
type runHTTPActionHandler struct {
rails *api.API
backend http.Handler
token string
originalReq *http.Request
action *pb.Action
}
Process:
Security features:
The MCP manager handles communication with Model Context Protocol servers:
type mcpManager interface {
Tools() []*pb.McpTool
PreApprovedTools() []string
HasTool(name string) bool
CallTool(ctx context.Context, action *pb.Action) (*pb.ClientEvent, error)
Close() error
}
Features:
For self-managed instances, distributed workflow locking prevents concurrent execution:
type workflowLockManager struct {
rdb *redis.Client
}
Process:
WebSocket ClientEvent
↓
Unmarshal JSON to Protocol Buffer
↓
streamManager.Send
↓
Duo Workflow Service receives ClientEvent
gRPC Action received (streamManager.Recv)
↓
Determine action type
↓
Execute action (HTTP request or MCP tool)
↓
Create ActionResponse
↓
streamManager.Send
↓
Receive response in gRPC stream
↓
Marshal to JSON
↓
Send to WebSocket
↓
Client receives action response
The package exposes four Prometheus counters.
gitlab_workhorse_duo_workflow_connections_totalIncremented for every inbound request that passes pre-authorization, before the WebSocket upgrade is attempted. This includes requests that subsequently fail to upgrade.
gitlab_workhorse_duo_workflow_connection_errors_totalIncremented whenever a connection fails at any stage, labelled by error_type:
| Stage | Trigger | error_type |
|---|---|---|
| WebSocket upgrade | websocket.Upgrader.Upgrade returns an error | other |
| Runner initialisation | newRunner / newStreamManager returns an error | other |
| Runner execution | Usage quota exceeded | quota_exceeded |
| Runner execution | Workflow lock cannot be acquired | locked |
| Runner execution | Any other runner.Execute error | other |
The ratio connection_errors_total / connections_total gives the connection error rate.
Example query to break down errors by type:
rate(gitlab_workhorse_duo_workflow_connection_errors_total[5m]) by (error_type)
gitlab_workhorse_duo_workflow_sessions_totalIncremented each time a gRPC ExecuteWorkflow stream is successfully opened to the Duo Workflow Service (inside newStreamManager).
gitlab_workhorse_duo_workflow_session_errors_totalIncremented for every non-EOF error received on the ExecuteWorkflow stream, labelled by the gRPC status code string (e.g. "Internal", "Unavailable", "ResourceExhausted"). io.EOF is the normal end-of-stream signal and does not increment this counter.
Example query:
rate(gitlab_workhorse_duo_workflow_session_errors_total[5m]) by (grpc_code)
Workhorse receives configuration during pre-authorization:
type DuoWorkflow struct {
Service *DuoWorkflowServiceConfig // Primary gRPC service
CloudServiceForSelfHosted *DuoWorkflowServiceConfig // Optional cloud tracking service
LockConcurrentFlow bool // Enable workflow locking
McpServers map[string]*McpServerConfig
}
type DuoWorkflowServiceConfig struct {
URI string // gRPC service URI (e.g., "localhost:50052")
Headers map[string]string // Headers for gRPC requests (e.g., OAuth token)
Secure bool // Use TLS for gRPC connection
}
MCP servers are configured in GitLab Rails and passed to Workhorse:
{
gitlab: {
# URL is automatically resolved in Workhorse
Headers: { "Authorization" => "Bearer token" },
Tools: ["tool1", "tool2"], # Empty means all tools
PreApprovedTools: ["tool1"]
},
external_server: {
URL: "https://mcp-server.example.com",
Headers: { "Authorization" => "Bearer token" },
Tools: []
}
}
ErrServerUnavailable and closes WebSocketRESOURCE_EXHAUSTED with USAGE_QUOTA_EXCEEDED message is translated to errUsageQuotaExceededError; the WebSocket is closed with ClosePolicyViolationDuring server shutdown:
StopWorkflow requests to Duo Workflow ServiceTest files are located in the same directory:
go test ./internal/ai_assist/duoworkflow/... -v