pkg/cmd/roachprod-centralized/docs/services/CLUSTERS.md
The Clusters Service manages cloud cluster lifecycle operations for roachprod-centralized, providing synchronized cluster state management across multiple cloud providers with distributed locking and health-aware coordination.
The clusters service has been refactored into a modular architecture with clean separation of concerns:
pkg/cmd/roachprod-centralized/services/clusters/
├── service.go # Service orchestration and lifecycle
├── api.go # Public CRUD operations
├── sync.go # Cloud sync with distributed locking
├── coordination.go # Inter-service coordination helpers
├── registry.go # Task type registration system
├── internal/ # Implementation details (encapsulated)
│ ├── operations/
│ │ └── operations.go # CRUD operation implementations
│ └── scheduler/
│ └── scheduler.go # Periodic refresh scheduling
├── tasks/ # Concrete task implementations
│ └── sync.go # Cloud sync task
├── types/ # Public interfaces and DTOs
│ └── types.go # IService interface and DTOs
└── mocks/ # Auto-generated test mocks
└── clusters.go
service.go)Purpose: Main service struct, lifecycle management, and background work coordination.
Key Methods:
NewService() - Creates service instance with cloud providers and optionsRegisterTasks() - Called during app initializationStartService() - Initializes service (performs initial sync if needed)StartBackgroundWork() - Starts periodic refresh schedulerShutdown() - Graceful shutdown with WaitGroup synchronizationConfiguration:
type Options struct {
WorkersEnabled bool // Enable background workers
NoInitialSync bool // Skip cloud sync on startup
PeriodicRefreshEnabled bool // Enable periodic cloud refresh
PeriodicRefreshInterval time.Duration // Refresh interval (default: 5m)
}
Background Work:
func (s *Service) StartBackgroundWork(ctx context.Context, l *logger.Logger, errChan chan<- error) error {
// Start periodic cloud sync scheduler (if enabled)
scheduler.StartPeriodicRefresh(ctx, l, errChan, interval, s, onComplete)
}
api.go)Purpose: CRUD operations exposed via types.IService interface.
Methods:
GetAllClusters(ctx, l, input) - Query clusters with filtersGetCluster(ctx, l, input) - Get single cluster by nameRegisterCluster(ctx, l, input) - Register an external creation of a clusterRegisterClusterUpdate(ctx, l, input) - Register an external update to an existing clusterRegisterClusterDelete(ctx, l, input) - Register an external deletion of a clusterGetAllDNSZoneVMs(ctx, l, input) - Get DNS zone VMsSyncClouds(ctx, l) - Trigger manual cloud sync (creates task)Used by: Controllers (HTTP endpoints), CLI commands
CRUD Flow:
API Call → Check Sync Status
│
┌────────┴───────────┐
│ │
No Sync Sync In Progress
│ │
▼ ▼
Apply Now Queue Operation
│ │
└────────┬───────────┘
▼
Trigger DNS Sync (if needed)
sync.go)Purpose: Core cloud synchronization logic with distributed locking and operation replay.
Key Methods:
Sync(ctx, l) - Main sync operation with lock managementScheduleSyncTaskIfNeeded(ctx, l) - Create sync task (prevents duplicates)acquireSyncLockWithHealthCheck(ctx, l, instanceID) - Atomic lock acquisitionSync Algorithm:
1. Get instance ID from health service
2. Try to acquire distributed lock (atomic + health check)
- If lock held by healthy instance → Skip, return cached data
- If lock held by unhealthy instance → Acquire lock
- If lock free → Acquire lock
3. Clear stale operations from previous crashed syncs
4. List clusters from all cloud providers (GCE, AWS, Azure)
5. LOOP until no operations remain:
a. Fetch pending operations WITH timestamp
b. Apply operations to staging clusters (in-memory)
c. Clear applied operations by timestamp
6. Atomically store clusters + release lock (single transaction)
7. Clear any remaining operations (arrived during final window)
8. Trigger DNS sync task
Health-Aware Locking: The lock acquisition is atomic and includes health verification to prevent stale locks:
// Repository method checks lock state and owner health in single transaction
acquired, err := s._store.AcquireSyncLockWithHealthCheck(
ctx, l, instanceID, healthTimeout,
)
Benefits:
coordination.go)Purpose: Inter-service coordination and data conversion.
Methods:
operationToOperationData(op) - Convert IOperation → OperationData for storageoperationDataToOperation(opData) - Convert OperationData → IOperation for replaymaybeEnqueuePublicDNSSyncTaskService(ctx, l) - Trigger DNS sync taskconditionalEnqueueOperationWithHealthCheck(ctx, l, operation) - Queue operation if sync activeOperation Conversion:
// In-memory operation
op := operations.OperationCreate{Cluster: newCluster}
// Convert to persistable format
opData, err := s.operationToOperationData(op)
// opData contains: Type, ClusterName, ClusterData (JSON), Timestamp
// Later: Replay from queue
op, err := s.operationDataToOperation(opData)
err = op.ApplyOnStagingClusters(ctx, l, stagingClusters)
Conditional Enqueueing:
// Only queue if sync is active AND syncing instance is healthy
enqueued, err := s.conditionalEnqueueOperationWithHealthCheck(ctx, l, operation)
if !enqueued {
// Apply directly to repository
err = op.ApplyOnRepository(ctx, l, s._store)
}
registry.go)Purpose: Integration with task system for async cloud syncs.
Task Types:
ClustersTaskSync - Async cloud sync taskKey Methods:
GetTaskServiceName() → "clusters"GetHandledTasks() → Map of task type to task implementationCreateTaskInstance(taskType) - Hydrate task from type stringExample:
// Service registers its tasks
func (s *Service) RegisterTasks(taskRegistry IRegistry) {
taskRegistry.RegisterTasksService(s)
}
// Service implements ITasksService interface
func (s *Service) GetTaskServiceName() string {
return "clusters"
}
func (s *Service) GetHandledTasks() map[string]types.ITask {
return map[string]types.ITask{
string(ClustersTaskSync): &TaskSync{Service: s},
}
}
func (s *Service) CreateTaskInstance(taskType string) (tasks.ITask, error) {
if taskType == string(ClustersTaskSync) {
return &TaskSync{
Task: tasks.Task{Type: taskType},
Service: s,
}, nil
}
return nil, types.ErrUnknownTaskType
}
internal/operations/)Purpose: Encapsulated CRUD operation implementations.
Interface:
type IOperation interface {
ApplyOnRepository(ctx, l, store) error // Apply to database
ApplyOnStagingClusters(ctx, l, clusters) error // Apply to in-memory cloud data
}
Operation Types:
type OperationCreate struct { Cluster cloudcluster.Cluster }
type OperationUpdate struct { Cluster cloudcluster.Cluster }
type OperationDelete struct { Cluster cloudcluster.Cluster }
Usage:
// Create operation
op := operations.OperationCreate{Cluster: newCluster}
// Apply to repository
err := op.ApplyOnRepository(ctx, l, store)
// During sync: replay on staging data
err := op.ApplyOnStagingClusters(ctx, l, stagingClusters)
internal/scheduler/)Purpose: Periodic cloud refresh scheduling.
Interface:
type PeriodicRefreshScheduler interface {
ScheduleSyncTaskIfNeeded(ctx, l) (tasks.ITask, error)
}
func StartPeriodicRefresh(
ctx context.Context,
l *logger.Logger,
errChan chan<- error,
refreshInterval time.Duration,
scheduler PeriodicRefreshScheduler,
onComplete func(),
)
Behavior:
scheduler.ScheduleSyncTaskIfNeeded() on each intervalCreateTaskIfNotRecentlyScheduled to prevent duplicate taskserrChanonComplete() callback on exit (for WaitGroup.Done)Scheduler Ticker Fires (every 10m)
│
▼
ScheduleSyncTaskIfNeeded()
│
│ Check: Is there a pending/running sync task recently scheduled?
│
├─ Yes → Skip (prevent duplicate)
│
└─ No → CreateTaskIfNotRecentlyScheduled()
│
▼
Task Enqueued
│
▼
Worker Picks Up Task
│
▼
TaskSync.Process()
│
▼
Service.Sync()
│
▼
[Acquire Lock] → [List Cloud] → [Replay Ops] → [Store] → [Release Lock]
Scenario: User creates cluster while sync is in progress
Time 0: Sync starts, acquires lock
sync_status.InProgress = true
sync_status.InstanceID = "instance-1"
Clears stale operations from previous crashes
Time 1: Cloud listing begins
Time 2: User calls CreateCluster()
↓
Check: Is sync in progress?
↓
Yes → Check: Is syncing instance healthy?
↓
Yes → Queue operation
store.ConditionalEnqueueOperation(opData)
↓
Return success to user
Time 3: Cloud listing completes
↓
LOOP: Fetch operations WITH timestamp
↓
Found operations (including user's create)
↓
Replay operations on staging clusters
↓
Clear applied operations by timestamp
↓
Fetch again → No more operations
↓
Exit loop
Time 4: Atomically store clusters + release lock
↓
Clear final operations (if any arrived during step 3-4)
↓
Trigger DNS sync
Race Condition Handling:
Scenario: Syncing instance crashes mid-sync
Time 0: Instance A starts sync, acquires lock
sync_status.InProgress = true
sync_status.InstanceID = "instance-a"
health_service.instance-a.LastHeartbeat = now()
Time 1: Instance A crashes (no more heartbeats)
Time 2: Instance B attempts sync
↓
acquireSyncLockWithHealthCheck("instance-b")
↓
Repository checks:
- Lock held by "instance-a"
- Last heartbeat > healthTimeout (stale!)
↓
Atomic operation:
- Set sync_status.InstanceID = "instance-b"
- Set sync_status.Timestamp = now()
↓
Instance B acquires lock, proceeds with sync
During cloud sync, we fetch fresh cluster state from cloud providers (source of truth). Any CRUD operations performed during the sync would be overwritten if we simply replaced the repository data. Operation replay solves this:
Without Replay:
User creates cluster X → Sync lists clouds (no X) → Store overwrites → Cluster X lost!
With Replay:
User creates cluster X → Queued as OperationCreate
Sync lists clouds → Replay OperationCreate(X) on staging data → Store includes X → Success!
// 1. Clear stale operations from previous crashed syncs
staleCount, err := s._store.ClearPendingOperations(ctx, l)
if staleCount > 0 {
l.Warn("cleared stale operations", slog.Int64("count", staleCount))
}
// 2. List fresh cloud data
err = s.roachprodCloud.ListCloud(crlLogger, vm.ListOptions{...})
// 3. Loop to apply all operations on staging clusters
totalOpsApplied := 0
for {
// Fetch operations with timestamp for safe clearing
pendingOps, fetchTimestamp, err := s._store.GetPendingOperationsWithTimestamp(ctx, l)
if len(pendingOps) == 0 {
break
}
// Replay each operation on staging data
for _, opData := range pendingOps {
op, err := s.operationDataToOperation(opData)
err = op.ApplyOnStagingClusters(ctx, l, s.roachprodCloud.Clusters)
}
totalOpsApplied += len(pendingOps)
// Clear only the operations we just applied
clearedCount, err := s._store.ClearPendingOperationsBefore(ctx, l, fetchTimestamp)
l.Debug("cleared applied operations", slog.Int64("count", clearedCount))
}
// 4. Atomically store merged result + release lock (single transaction)
err = s._store.StoreClustersAndReleaseSyncLock(ctx, l, s.roachprodCloud.Clusters, instanceID)
// 5. Clear any remaining operations (arrived during final window)
remainingCount, err := s._store.ClearPendingOperations(ctx, l)
if remainingCount > 0 {
l.Warn("cleared operations from final window", slog.Int64("count", remainingCount))
}
OperationCreate:
func (o OperationCreate) ApplyOnStagingClusters(ctx, l, clusters) error {
clusters[o.Cluster.Name] = &o.Cluster // Add to staging
return nil
}
OperationUpdate:
func (o OperationUpdate) ApplyOnStagingClusters(ctx, l, clusters) error {
clusters[o.Cluster.Name] = &o.Cluster // Replace in staging
return nil
}
OperationDelete:
func (o OperationDelete) ApplyOnStagingClusters(ctx, l, clusters) error {
delete(clusters, o.Cluster.Name) // Remove from staging
return nil
}
type Options struct {
WorkersEnabled bool // Enable background workers (default: true)
NoInitialSync bool // Skip sync on startup (default: false)
PeriodicRefreshEnabled bool // Enable periodic refresh (default: true)
PeriodicRefreshInterval time.Duration // Refresh interval (default: 5m)
}
# Periodic refresh interval
export ROACHPROD_CLUSTERS_REFRESH_INTERVAL=10m
# Disable periodic refresh
roachprod-centralized api --no-periodic-refresh
# Disable all background workers
roachprod-centralized api --no-workers
Clusters service integrates with roachprod's cloud provider system. Configure providers in cloud.yaml:
gce:
enabled: true
project: my-gce-project
credentials: /path/to/gce-creds.json
aws:
enabled: true
regions: [us-east-1, us-west-2]
credentials: /path/to/aws-creds
azure:
enabled: true
subscription: my-subscription-id
credentials: /path/to/azure-creds
The clusters service includes comprehensive test coverage:
Total: 24 test functions, 44+ test cases, 100% pass rate
func TestService_CreateCluster(t *testing.T) {
tests := []struct {
name string
input types.InputCreateClusterDTO
mockFunc func(*clustersrepomock.IClustersRepository)
want *cloudcluster.Cluster
wantErr error
}{
{
name: "success",
input: types.InputCreateClusterDTO{
Cluster: cloudcluster.Cluster{Name: "new-cluster"},
},
mockFunc: func(repo *clustersrepomock.IClustersRepository) {
repo.On("GetCluster", mock.Anything, mock.Anything, "new-cluster").
Return(cloudcluster.Cluster{}, types.ErrClusterNotFound)
repo.On("StoreCluster", mock.Anything, mock.Anything, mock.Anything).
Return(nil)
repo.On("GetSyncStatus", mock.Anything, mock.Anything).
Return(&clusters.SyncStatus{InProgress: false}, nil)
},
want: &cloudcluster.Cluster{Name: "new-cluster"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// ... test implementation
})
}
}
Each sync calls cloud provider APIs:
Rate Limit Mitigation:
Read Operations:
Write Operations:
Symptoms: Cloud clusters not appearing in database
Causes:
--no-workers mode)Solution:
# Check worker instances
roachprod-centralized workers
# Check sync status
roachprod-centralized clusters sync-status
# Force manual sync
roachprod-centralized clusters sync
Symptoms: Sync never runs, lock always held
Cause: Syncing instance crashed without releasing lock, health check timeout too long
Solution:
# Check lock status
SELECT * FROM sync_status WHERE service = 'clusters';
# Health service will automatically release lock after timeout
# Or manually release (emergency only):
UPDATE sync_status SET in_progress = false WHERE service = 'clusters';
Symptoms: cluster_operations table growing, never cleared
Cause: Sync consistently failing or not running
Automatic Cleanup: Each sync clears stale operations at startup, so this should be rare
Solution:
# Check pending operations
SELECT * FROM cluster_operations ORDER BY created_at DESC;
# Check for stale operations in logs
tail -f /var/log/roachprod-centralized/app.log | grep "cleared stale operations"
# Check sync errors in logs
tail -f /var/log/roachprod-centralized/app.log | grep "sync"
# Manual cleanup (emergency only, system handles this automatically):
DELETE FROM cluster_operations WHERE created_at < now() - INTERVAL '1 hour';
Symptoms: CreateCluster/UpdateCluster taking longer than usual
Cause: Operations being queued, waiting for sync to complete and replay
Expected: Operations queue instantly, but final state not visible until sync completes
Solution: This is normal behavior. If problematic, reduce sync frequency or optimize cloud API calls.
type InputGetAllClustersDTO struct {
Filters filters.FilterSet
}
type InputGetClusterDTO struct {
Name string
}
type InputRegisterClusterDTO struct {
Cluster cloudcluster.Cluster
}
type InputRegisterClusterUpdateDTO struct {
Cluster cloudcluster.Cluster
}
type InputRegisterClusterDeleteDTO struct {
Cluster cloudcluster.Cluster
}
type InputGetAllDNSZoneVMsDTO struct {
Filters filters.FilterSet
}
var (
ErrClusterNotFound = errors.New("cluster not found")
ErrClusterAlreadyExists = errors.New("cluster already exists")
ErrShutdownTimeout = errors.New("shutdown timeout")
)
type IService interface {
// Lifecycle
StartService(ctx context.Context, l *logger.Logger) error
StartBackgroundWork(ctx context.Context, l *logger.Logger, errChan chan<- error) error
Shutdown(ctx context.Context) error
RegisterTasks(ctx context.Context, l *logger.Logger, taskRegistry IRegistry) error
// Task Registry
GetTaskServiceName() string
GetHandledTasks() map[string]ITask
CreateTaskInstance(taskType string) (ITask, error)
// CRUD Operations
GetAllClusters(ctx, l, input) (Clusters, error)
GetCluster(ctx, l, input) (*Cluster, error)
RegisterCluster(ctx, l, input) (*Cluster, error)
RegisterClusterUpdate(ctx, l, input) (*Cluster, error)
RegisterClusterDelete(ctx, l, input) error
GetAllDNSZoneVMs(ctx, l, input) (DNSZoneVMs, error)
// Sync
SyncClouds(ctx, l) (ITask, error) // Async via task system
Sync(ctx, l) (Clusters, error) // Synchronous direct sync
}
The clusters service relies on several specialized repository methods for operation management:
// Fetch operations with timestamp for safe clearing
GetPendingOperationsWithTimestamp(ctx, l) ([]OperationData, time.Time, error)
// Clear operations created before a specific timestamp
// Returns the number of operations deleted
ClearPendingOperationsBefore(ctx, l, timestamp) (int64, error)
// Clear all pending operations
// Returns the number of operations deleted
ClearPendingOperations(ctx, l) (int64, error)
Usage Pattern:
// Fetch with timestamp
ops, fetchTime, err := store.GetPendingOperationsWithTimestamp(ctx, l)
// Apply operations...
// Clear only what was fetched
count, err := store.ClearPendingOperationsBefore(ctx, l, fetchTime)
l.Debug("cleared operations", slog.Int64("count", count))
// Atomically store clusters and release sync lock in a single transaction
StoreClustersAndReleaseSyncLock(ctx, l, clusters, instanceID) error
Why Atomic?
Database Implementation:
BEGIN;
DELETE FROM clusters;
INSERT INTO clusters (name, data) VALUES (...);
UPDATE cluster_sync_state
SET in_progress = false, instance_id = NULL
WHERE id = 1 AND instance_id = $1;
COMMIT;
Last Updated: October 3, 2025