pkg/cmd/roachprod-centralized/docs/services/TASKS.md
The Tasks Service provides a distributed, fault-tolerant background task processing system for roachprod-centralized. It handles asynchronous operations like cluster synchronization, DNS updates, and health checks.
The tasks service has been refactored into a modular architecture with clean separation of concerns:
pkg/cmd/roachprod-centralized/services/tasks/
├── service.go # Service orchestration and lifecycle
├── api.go # Public CRUD operations
├── coordination.go # Inter-service coordination helpers
├── registry.go # Task type registration system
├── operations.go # Business operations (called by tasks)
├── internal/ # Implementation details (encapsulated)
│ ├── processor/
│ │ ├── processor.go # Worker pool and queue management
│ │ └── executor.go # Task execution with timeouts
│ ├── scheduler/
│ │ ├── scheduler.go # Base scheduler interface
│ │ └── purge_scheduler.go # Periodic purge scheduling
│ └── metrics/
│ └── collector.go # Metrics collection
├── tasks/ # Concrete task implementations
│ └── purge.go # Task purge implementation
├── types/ # Public interfaces and DTOs
│ └── types.go # IService, ITask, ITasksService interfaces
└── mocks/ # Auto-generated test mocks
└── tasks.go
service.go)Purpose: Main service struct, lifecycle management, and coordination of all components.
Key Methods:
NewService() - Creates service instance with optionsRegisterTasks() - Called during app initializationStartService() - Initializes service (performs initial sync if needed)StartBackgroundWork() - Starts processors, schedulers, and metricsShutdown() - Graceful shutdown with WaitGroup synchronizationBackground Work:
func (s *Service) StartBackgroundWork(ctx context.Context, l *logger.Logger) error {
// 1. Start task processor (workers poll and execute tasks)
processor.StartProcessing(ctx, l, errChan, workers, instanceID, repo, s)
// 2. Start purge scheduler (if workers enabled)
scheduler.StartPurgeScheduling(ctx, l, errChan, interval, s, onComplete)
// 3. Start metrics collection (if enabled)
metrics.StartMetricsCollection(ctx, l, errChan, interval, s, onComplete)
}
api.go)Purpose: CRUD operations exposed via types.IService interface.
Methods:
GetTasks(ctx, l, input) - Query tasks with filtersGetTask(ctx, l, input) - Get single task by IDCreateTask(ctx, l, task) - Create new taskUsed by: Controllers (HTTP endpoints)
coordination.go)Purpose: Helper methods used by other services to coordinate task scheduling.
Methods:
CreateTaskIfNotAlreadyPlanned(ctx, l, task) - Prevents duplicate ad-hoc tasksCreateTaskIfNotRecentlyScheduled(ctx, l, task, window) - Prevents duplicate periodic tasksWaitForTaskCompletion(ctx, l, taskID, timeout) - Blocks until task completesGetMostRecentCompletedTaskOfType(ctx, l, taskType) - Query helperUsed by: Clusters service, Health service, Public DNS service
registry.go)Purpose: Allows services to register their task types and enables task hydration.
Key Concepts:
tasks.Task) to concrete types with service referencesExample:
// Service registers its tasks
func (s *ClustersService) RegisterTasks(tasksService types.ITasksService) {
tasksService.RegisterTasksService(s)
}
// Service implements ITasksService interface
func (s *ClustersService) GetTaskServiceName() string {
return "clusters"
}
func (s *ClustersService) GetHandledTasks() map[string]types.ITask {
return map[string]types.ITask{
"cluster_sync": &TaskSync{},
}
}
func (s *ClustersService) CreateTaskInstance(taskType string) (tasks.ITask, error) {
if taskType == "cluster_sync" {
return NewTaskSync(s.clustersService), nil
}
return nil, types.ErrUnknownTaskType
}
Hydration Flow:
tasks.Task)HydrateTask(baseTask) on tasks serviceCreateTaskInstance(taskType) to get concrete type with dependenciesoperations.go)Purpose: Business operations that task implementations can call.
Methods:
PurgeTasks(ctx, l) - Purge old done/failed taskspurgeTasksInState(ctx, l, duration, state) - Helper for state-specific purgingUsed by: TaskPurge implementation
internal/processor/)Purpose: Task queue management and execution.
Components:
processor.go - Worker pool, polling, queue coordinationexecutor.go - Individual task execution with timeout handlingKey Features:
Interface:
type TaskExecutor interface {
HydrateTask(base mtasks.ITask) (types.ITask, error)
MarkTaskAs(ctx, l, id, status) error
UpdateError(ctx, l, id, errMsg) error
GetManagedTask(taskType string) types.ITask
GetMetricsEnabled() bool
IncrementProcessedTasks()
GetDefaultTimeout() types.TimeoutGetter
}
internal/scheduler/)Purpose: Periodic task scheduling.
Components:
scheduler.go - Base scheduler interfacepurge_scheduler.go - Schedules periodic purge tasksKey Features:
CreateTaskIfNotRecentlyScheduledinternal/metrics/)Purpose: Prometheus metrics collection.
Components:
collector.go - Periodic statistics updatesMetrics Exported: See Metrics Documentation for complete reference of all exposed metrics.
Tasks flow through a simple state machine:
┌─────────────────────────┐
│ Task Created │
│ (via API or scheduler) │
└───────────┬─────────────┘
│
▼
┌────────────┐
│ pending │ ◄─── Tasks wait here until claimed by worker
└──────┬─────┘
│
│ Worker claims via GetTasksForProcessing()
▼
┌────────────┐
│ running │ ◄─── Worker actively processing
└──────┬─────┘
│
┌───────────┴────────────┐
│ │
│ Success │ Error/Timeout
▼ ▼
┌────────────┐ ┌─────────────┐
│ done │ │ failed │ ◄─── Terminal states
└────────────┘ └─────────────┘ (no auto-retry)
State Transitions:
pending → running: Worker claims taskrunning → done: Task execution succeedsrunning → failed: Task execution errors or times outNote: Failed tasks are not automatically retried. Retry logic must be implemented at the application level.
package mytasks
import (
"context"
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/models/tasks"
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/utils/logger"
)
const (
MyTaskType = "my_task"
)
type MyTask struct {
tasks.Task // Embed base task
myService IMyService // Service dependency
}
func NewMyTask(service IMyService) *MyTask {
return &MyTask{
Task: tasks.Task{
Type: MyTaskType,
},
myService: service,
}
}
func (t *MyTask) Process(ctx context.Context, l *logger.Logger) error {
l.Info("Starting my task", "task_id", t.ID)
if err := t.myService.DoSomething(ctx, l); err != nil {
l.Error("Task failed", "error", err)
return err
}
l.Info("Task completed successfully")
return nil
}
func (t *MyTask) GetTimeout() time.Duration {
return 5 * time.Minute // Custom timeout for this task type
}
In your service:
func (s *MyService) RegisterTasks(tasksService types.ITasksService) {
tasksService.RegisterTasksService(s)
}
func (s *MyService) GetTaskServiceName() string {
return "myservice"
}
func (s *MyService) GetHandledTasks() map[string]types.ITask {
return map[string]types.ITask{
MyTaskType: &MyTask{}, // Template instance
}
}
func (s *MyService) CreateTaskInstance(taskType string) (tasks.ITask, error) {
switch taskType {
case MyTaskType:
return NewMyTask(s), nil // Create with service dependency
default:
return nil, types.ErrUnknownTaskType
}
}
// Create and enqueue the task
task := NewMyTask(myService)
createdTask, err := tasksService.CreateTask(ctx, l, task)
if err != nil {
return err
}
// Optionally wait for completion
err = tasksService.WaitForTaskCompletion(ctx, l, createdTask.GetID(), 10*time.Minute)
Tasks can store typed options using the generic TaskWithOptions[T] type. This provides type-safe access to task parameters with automatic JSON marshaling.
// 1. Define your options struct
type MyTaskOptions struct {
Param1 string `json:"param1"`
Param2 int `json:"param2"`
Timeout time.Duration `json:"timeout"`
}
// 2. Use TaskWithOptions[T] generic type
type MyTask struct {
mtasks.TaskWithOptions[MyTaskOptions] // Embed generic task with options
Service IMyService // Service dependency
}
// 3. Create task with SetOptions
func NewMyTask(service IMyService, param1 string, param2 int) (*MyTask, error) {
task := &MyTask{Service: service}
task.Type = MyTaskType
// SetOptions automatically marshals to JSON
if err := task.SetOptions(MyTaskOptions{
Param1: param1,
Param2: param2,
Timeout: 5 * time.Minute,
}); err != nil {
return nil, err
}
return task, nil
}
// 4. Access options with GetOptions
func (t *MyTask) Process(ctx context.Context, l *logger.Logger) error {
// GetOptions returns typed options (no unmarshaling needed!)
opts := t.GetOptions()
l.Info("Processing task",
"param1", opts.Param1,
"param2", opts.Param2,
"timeout", opts.Timeout,
)
// Use typed options directly
return t.Service.DoSomethingWith(ctx, l, opts.Param1, opts.Param2)
}
// From services/health/tasks/tasks.go
type TaskCleanupOptions struct {
InstanceTimeout time.Duration `json:"instance_timeout"`
CleanupRetention time.Duration `json:"cleanup_retention"`
}
type TaskCleanup struct {
mtasks.TaskWithOptions[TaskCleanupOptions]
Service types.IHealthService
}
func NewTaskCleanup(instanceTimeout, cleanupRetention time.Duration) (*TaskCleanup, error) {
task := &TaskCleanup{}
task.Type = string(HealthTaskCleanup)
if err := task.SetOptions(TaskCleanupOptions{
InstanceTimeout: instanceTimeout,
CleanupRetention: cleanupRetention,
}); err != nil {
return nil, err
}
return task, nil
}
func (t *TaskCleanup) Process(ctx context.Context, l *logger.Logger) error {
opts := t.GetOptions() // Type-safe access!
deletedCount, err := t.Service.CleanupDeadInstances(
ctx, l,
opts.InstanceTimeout,
opts.CleanupRetention,
)
if err != nil {
return err
}
_ = deletedCount // Use result as needed
return nil
}
The TaskWithOptions[T] generic type:
type TaskWithOptions[T any] struct {
Task // Embed base task
Options T `json:"-"` // Typed options (not directly serialized)
}
// GetOptions returns typed options (reconstructed from Payload)
func (t *TaskWithOptions[T]) GetOptions() *T {
return &t.Options
}
// SetOptions marshals to JSON and stores in Payload field
func (t *TaskWithOptions[T]) SetOptions(opts T) error {
t.Options = opts
// Auto-marshal to JSON and store in task.Payload
data, err := json.Marshal(opts)
if err != nil {
return err
}
t.Payload = data
return nil
}
Why Hydration?
When tasks are stored in the database, they lose their service references. Hydration reconstructs the task with its dependencies:
Database Task (serialized)
│
│ GetTasksForProcessing()
▼
tasks.Task (base type)
│
│ HydrateTask()
▼
Concrete Task with Services
│
│ Process()
▼
Execution
Example:
// 1. Task stored in DB as tasks.Task
dbTask := &tasks.Task{
ID: uuid.MakeV4(),
Type: "cluster_sync",
}
// 2. Worker hydrates it
concreteTask, err := tasksService.HydrateTask(dbTask)
// Returns: &TaskSync{Task: *dbTask, clustersService: ...}
// 3. Now has service dependency and can execute
concreteTask.Process(ctx, l, errChan)
type Options struct {
Workers int // Number of concurrent workers (default: 1)
WorkersEnabled bool // Enable task processing (default: true)
DefaultTasksTimeout time.Duration // Default task timeout (default: 30s)
PurgeDoneTaskOlderThan time.Duration // Purge done tasks after (default: 2h)
PurgeFailedTaskOlderThan time.Duration // Purge failed tasks after (default: 24h)
PurgeTasksInterval time.Duration // How often to purge (default: 10m)
CollectMetrics bool // Enable metrics collection (default: true)
StatisticsUpdateInterval time.Duration // Metrics update frequency (default: 30s)
}
# Task processing
export ROACHPROD_TASKS_WORKERS=3
# Disable workers (API-only mode)
roachprod-centralized api --no-workers
The tasks service includes comprehensive test coverage:
Total: 36 tests, 100% pass rate
func TestCreateTaskIfNotAlreadyPlanned(t *testing.T) {
mockRepo := &tasksrepomock.ITasksRepository{}
taskService := NewService(mockRepo, "test-instance", Options{})
// Setup: no existing pending tasks
mockRepo.On("GetTasks", ctx, mock.Anything, mock.Anything).Return([]tasks.ITask{}, nil)
mockRepo.On("CreateTask", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// Execute
task := &MockTask{Task: tasks.Task{Type: "test"}}
createdTask, err := taskService.CreateTaskIfNotAlreadyPlanned(ctx, logger.DefaultLogger, task)
// Assert
assert.NotNil(t, createdTask)
assert.Nil(t, err)
mockRepo.AssertExpectations(t)
}
Symptoms: Tasks created but never processed
Causes:
--no-workers mode)Solution:
# Check worker instances
roachprod-centralized workers
# Check task registration
# Verify service implements ITasksService and calls RegisterTasksService
Symptoms: Tasks fail with timeout error
Causes:
Solution:
// Increase timeout for specific task type
func (t *MyTask) GetTimeout() time.Duration {
return 10 * time.Minute // Longer timeout
}
// Or increase default timeout
tasksService := NewService(repo, instanceID, Options{
DefaultTasksTimeout: 5 * time.Minute,
})
Symptoms: Service hangs during shutdown
Cause: Background goroutines not signaling completion
Solution: Verify all background routines call WaitGroup.Done() or use onComplete callback
Last Updated: October 2, 2025