docs/plans/2025-12-30-task-scheduler-available-status.md
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add AVAILABLE status to task runs, separating "ready to execute" from "actively executing" to prepare for HA.
Architecture: PENDING → AVAILABLE → RUNNING flow. pending_scheduler handles all gating (RunAt, version ordering, database locks, parallel limits) and promotes to AVAILABLE. running_scheduler simply claims AVAILABLE tasks and executes.
Tech Stack: Go, PostgreSQL, Protocol Buffers, TypeScript/Vue
Files:
proto/store/store/task_run.proto:13-29Step 1: Add AVAILABLE enum value
In proto/store/store/task_run.proto, add AVAILABLE after SKIPPED:
enum Status {
STATUS_UNSPECIFIED = 0;
// Task run is queued and waiting to execute.
PENDING = 1;
// Task run is currently executing.
RUNNING = 2;
// Task run completed successfully.
DONE = 3;
// Task run encountered an error and failed.
FAILED = 4;
// Task run was canceled by user or system.
CANCELED = 5;
// Task run has not started yet.
NOT_STARTED = 6;
// Task run was skipped and will not execute.
SKIPPED = 7;
// Task run is ready for immediate execution.
AVAILABLE = 8;
}
Step 2: Generate proto files
Run:
cd proto && buf generate
Expected: New generated files in backend/generated-go/ and frontend/src/types/proto-es/
Step 3: Commit
but commit task-scheduler-available-status -m "proto: add AVAILABLE status to TaskRun"
Files:
backend/migrator/migration/3.14/0014##task_run_available_status.sqlbackend/migrator/migration/LATEST.sql:257,273backend/migrator/migrator_test.go (update TestLatestVersion)Step 1: Create migration file
Create backend/migrator/migration/3.14/0014##task_run_available_status.sql:
-- Add AVAILABLE status to task_run CHECK constraint
ALTER TABLE task_run DROP CONSTRAINT task_run_status_check;
ALTER TABLE task_run ADD CONSTRAINT task_run_status_check
CHECK (status IN ('PENDING', 'AVAILABLE', 'RUNNING', 'DONE', 'FAILED', 'CANCELED'));
-- Update partial index to include AVAILABLE
DROP INDEX idx_task_run_active_status_id;
CREATE INDEX idx_task_run_active_status_id ON task_run(status, id)
WHERE status IN ('PENDING', 'AVAILABLE', 'RUNNING');
Step 2: Update LATEST.sql
In backend/migrator/migration/LATEST.sql, line 257, change:
status text NOT NULL CHECK (status IN ('PENDING', 'AVAILABLE', 'RUNNING', 'DONE', 'FAILED', 'CANCELED')),
And line 273, change:
CREATE INDEX idx_task_run_active_status_id ON task_run(status, id) WHERE status IN ('PENDING', 'AVAILABLE', 'RUNNING');
Step 3: Update migrator test
In backend/migrator/migrator_test.go, update TestLatestVersion to expect:
3.14.14migration/3.14/0014##task_run_available_status.sqlStep 4: Commit
but commit task-scheduler-available-status -m "migration: add AVAILABLE status to task_run"
Files:
backend/store/task_run.go:297-304Step 1: Update CreatePendingTaskRuns to check AVAILABLE
In backend/store/task_run.go, the CreatePendingTaskRuns function checks for existing active task runs. Update line 303-304 to include AVAILABLE:
storepb.TaskRun_PENDING.String(), storepb.TaskRun_AVAILABLE.String(), storepb.TaskRun_RUNNING.String(), storepb.TaskRun_DONE.String())
Step 2: Run tests
go test -v -count=1 github.com/bytebase/bytebase/backend/store -run ^TestTaskRun
Step 3: Commit
but commit task-scheduler-available-status -m "store: include AVAILABLE in active task run check"
Files:
backend/store/task.go:107-160Step 1: Update query to check for AVAILABLE status
The FindBlockingTaskByVersion function checks if there are blocking tasks. It currently considers any task whose latest status is not DONE as blocking. This is correct - AVAILABLE tasks should also block.
No change needed - the query already blocks on anything that's not DONE.
Step 2: Verify logic
Run:
go build ./backend/...
Files:
backend/runner/taskrun/pending_scheduler.goStep 1: Add helper function for database mutual exclusion check
Add after line 16:
// checkDatabaseMutualExclusion checks if there's already an AVAILABLE or RUNNING task on the database.
// Returns true if the task can proceed (no conflict), false otherwise.
func (s *Scheduler) checkDatabaseMutualExclusion(ctx context.Context, task *store.TaskMessage, availableDBs map[string]bool) (bool, *int, error) {
if task.DatabaseName == nil {
return true, nil, nil
}
if !isSequentialTask(task) {
return true, nil, nil
}
databaseKey := getDatabaseKey(task.InstanceID, *task.DatabaseName)
// Check in-memory tracking first (tasks promoted this round)
if availableDBs[databaseKey] {
return false, nil, nil
}
// Check database for AVAILABLE or RUNNING tasks on the same database
activeTaskRuns, err := s.store.ListTaskRuns(ctx, &store.FindTaskRunMessage{
Status: &[]storepb.TaskRun_Status{storepb.TaskRun_AVAILABLE, storepb.TaskRun_RUNNING},
})
if err != nil {
return false, nil, errors.Wrapf(err, "failed to list active task runs")
}
for _, tr := range activeTaskRuns {
activeTask, err := s.store.GetTaskByID(ctx, tr.TaskUID)
if err != nil {
return false, nil, errors.Wrapf(err, "failed to get task")
}
if activeTask.DatabaseName == nil {
continue
}
if !isSequentialTask(activeTask) {
continue
}
if getDatabaseKey(activeTask.InstanceID, *activeTask.DatabaseName) == databaseKey {
return false, &activeTask.ID, nil
}
}
return true, nil, nil
}
Step 2: Add helper function for parallel limit check
Add after the previous function:
// checkParallelLimit checks if promoting this task would exceed the parallel task limit.
// Returns true if the task can proceed, false otherwise.
func (s *Scheduler) checkParallelLimit(ctx context.Context, task *store.TaskMessage, rolloutCounts map[int64]int) (bool, error) {
plan, err := s.store.GetPlan(ctx, &store.FindPlanMessage{UID: &task.PlanID})
if err != nil {
return false, errors.Wrapf(err, "failed to get plan")
}
if plan == nil {
return false, errors.Errorf("plan %v not found", task.PlanID)
}
project, err := s.store.GetProject(ctx, &store.FindProjectMessage{ResourceID: &plan.ProjectID})
if err != nil {
return false, errors.Wrapf(err, "failed to get project")
}
if project == nil {
return false, errors.Errorf("project %v not found", plan.ProjectID)
}
maxParallel := int(project.Setting.GetParallelTasksPerRollout())
if maxParallel <= 0 {
// No limit
return true, nil
}
// Count current AVAILABLE + RUNNING for this rollout
activeTaskRuns, err := s.store.ListTaskRuns(ctx, &store.FindTaskRunMessage{
PlanUID: &task.PlanID,
Status: &[]storepb.TaskRun_Status{storepb.TaskRun_AVAILABLE, storepb.TaskRun_RUNNING},
})
if err != nil {
return false, errors.Wrapf(err, "failed to list active task runs")
}
currentCount := len(activeTaskRuns) + rolloutCounts[task.PlanID]
return currentCount < maxParallel, nil
}
Step 3: Rewrite schedulePendingTaskRuns with in-memory tracking
Replace the schedulePendingTaskRuns function (lines 41-55):
func (s *Scheduler) schedulePendingTaskRuns(ctx context.Context) error {
taskRuns, err := s.store.ListTaskRuns(ctx, &store.FindTaskRunMessage{
Status: &[]storepb.TaskRun_Status{storepb.TaskRun_PENDING},
})
if err != nil {
return errors.Wrapf(err, "failed to list pending tasks")
}
// Track what we've promoted this round to avoid over-committing
availableDBs := map[string]bool{} // database key -> has AVAILABLE this round
rolloutCounts := map[int64]int{} // plan_id -> count promoted this round
for _, taskRun := range taskRuns {
promoted, err := s.schedulePendingTaskRun(ctx, taskRun, availableDBs, rolloutCounts)
if err != nil {
slog.Error("failed to schedule pending task run", log.BBError(err))
continue
}
if promoted {
task, err := s.store.GetTaskByID(ctx, taskRun.TaskUID)
if err != nil {
slog.Error("failed to get task after promotion", log.BBError(err))
continue
}
if task.DatabaseName != nil && isSequentialTask(task) {
availableDBs[getDatabaseKey(task.InstanceID, *task.DatabaseName)] = true
}
rolloutCounts[task.PlanID]++
}
}
return nil
}
Step 4: Rewrite schedulePendingTaskRun with all gating logic
Replace the schedulePendingTaskRun function (lines 57-129):
func (s *Scheduler) schedulePendingTaskRun(ctx context.Context, taskRun *store.TaskRunMessage, availableDBs map[string]bool, rolloutCounts map[int64]int) (bool, error) {
task, err := s.store.GetTaskByID(ctx, taskRun.TaskUID)
if err != nil {
return false, errors.Wrapf(err, "failed to get task")
}
// Check 1: RunAt time
if taskRun.RunAt != nil && time.Now().Before(*taskRun.RunAt) {
return false, nil
}
// Check 2: Version ordering (blocking tasks with smaller versions)
if task.DatabaseName != nil {
schemaVersion := task.Payload.GetSchemaVersion()
if schemaVersion != "" {
maybeTaskID, err := s.store.FindBlockingTaskByVersion(ctx, task.PlanID, task.InstanceID, *task.DatabaseName, schemaVersion)
if err != nil {
return false, errors.Wrapf(err, "failed to find blocking versioned tasks")
}
if maybeTaskID != nil {
s.stateCfg.TaskRunSchedulerInfo.Store(taskRun.ID, &storepb.SchedulerInfo{
ReportTime: timestamppb.Now(),
WaitingCause: &storepb.SchedulerInfo_WaitingCause{
Cause: &storepb.SchedulerInfo_WaitingCause_TaskUid{
TaskUid: int32(*maybeTaskID),
},
},
})
return false, nil
}
}
}
// Check 3: Database mutual exclusion (for sequential tasks)
canProceed, blockingTaskID, err := s.checkDatabaseMutualExclusion(ctx, task, availableDBs)
if err != nil {
return false, errors.Wrapf(err, "failed to check database mutual exclusion")
}
if !canProceed {
if blockingTaskID != nil {
s.stateCfg.TaskRunSchedulerInfo.Store(taskRun.ID, &storepb.SchedulerInfo{
ReportTime: timestamppb.Now(),
WaitingCause: &storepb.SchedulerInfo_WaitingCause{
Cause: &storepb.SchedulerInfo_WaitingCause_TaskUid{
TaskUid: int32(*blockingTaskID),
},
},
})
}
return false, nil
}
// Check 4: Parallel task limit per rollout
withinLimit, err := s.checkParallelLimit(ctx, task, rolloutCounts)
if err != nil {
return false, errors.Wrapf(err, "failed to check parallel limit")
}
if !withinLimit {
s.stateCfg.TaskRunSchedulerInfo.Store(taskRun.ID, &storepb.SchedulerInfo{
ReportTime: timestamppb.Now(),
WaitingCause: &storepb.SchedulerInfo_WaitingCause{
Cause: &storepb.SchedulerInfo_WaitingCause_ParallelTasksLimit{
ParallelTasksLimit: true,
},
},
})
return false, nil
}
// All checks passed - promote to AVAILABLE
s.stateCfg.TaskRunSchedulerInfo.Delete(taskRun.ID)
if _, err := s.store.UpdateTaskRunStatus(ctx, &store.TaskRunStatusPatch{
ID: taskRun.ID,
Updater: common.SystemBotEmail,
Status: storepb.TaskRun_AVAILABLE,
}); err != nil {
return false, errors.Wrapf(err, "failed to update task run status to available")
}
s.store.CreateTaskRunLogS(ctx, taskRun.ID, time.Now(), s.profile.DeployID, &storepb.TaskRunLog{
Type: storepb.TaskRunLog_TASK_RUN_STATUS_UPDATE,
TaskRunStatusUpdate: &storepb.TaskRunLog_TaskRunStatusUpdate{
Status: storepb.TaskRunLog_TaskRunStatusUpdate_RUNNING_WAITING,
},
})
// Tickle the running scheduler
select {
case s.stateCfg.TaskRunTickleChan <- 0:
default:
}
return true, nil
}
Step 5: Add import for isSequentialTask
The isSequentialTask function is in running_scheduler.go. Move it to a shared location or add the import. Since it's in the same package, no import needed.
Step 6: Build and verify
go build ./backend/runner/taskrun/...
Step 7: Commit
but commit task-scheduler-available-status -m "pending_scheduler: add all gating logic for PENDING->AVAILABLE"
Files:
backend/runner/taskrun/running_scheduler.goStep 1: Rewrite scheduleRunningTaskRuns to query AVAILABLE
Replace scheduleRunningTaskRuns function (lines 49-90):
func (s *Scheduler) scheduleRunningTaskRuns(ctx context.Context) error {
// Query AVAILABLE tasks (ready for execution)
availableTaskRuns, err := s.store.ListTaskRuns(ctx, &store.FindTaskRunMessage{
Status: &[]storepb.TaskRun_Status{storepb.TaskRun_AVAILABLE},
})
if err != nil {
return errors.Wrapf(err, "failed to list available task runs")
}
for _, taskRun := range availableTaskRuns {
if err := s.claimAndExecuteTaskRun(ctx, taskRun); err != nil {
slog.Error("failed to claim and execute task run", log.BBError(err))
}
}
// Also re-execute orphaned RUNNING tasks (for restart recovery)
runningTaskRuns, err := s.store.ListTaskRuns(ctx, &store.FindTaskRunMessage{
Status: &[]storepb.TaskRun_Status{storepb.TaskRun_RUNNING},
})
if err != nil {
return errors.Wrapf(err, "failed to list running task runs")
}
for _, taskRun := range runningTaskRuns {
// Skip if already executing in this instance
if _, ok := s.stateCfg.RunningTaskRuns.Load(taskRun.ID); ok {
continue
}
// Re-execute orphaned RUNNING task
if err := s.executeTaskRun(ctx, taskRun); err != nil {
slog.Error("failed to re-execute orphaned task run", log.BBError(err))
}
}
return nil
}
Step 2: Add claimAndExecuteTaskRun function
Add new function after scheduleRunningTaskRuns:
// claimAndExecuteTaskRun attempts to atomically claim an AVAILABLE task and execute it.
func (s *Scheduler) claimAndExecuteTaskRun(ctx context.Context, taskRun *store.TaskRunMessage) error {
// Optimistic locking: attempt to claim by updating AVAILABLE -> RUNNING
claimed, err := s.store.ClaimAvailableTaskRun(ctx, taskRun.ID)
if err != nil {
return errors.Wrapf(err, "failed to claim task run")
}
if !claimed {
// Another instance claimed it
return nil
}
return s.executeTaskRun(ctx, taskRun)
}
// executeTaskRun executes a task run that is already in RUNNING status.
func (s *Scheduler) executeTaskRun(ctx context.Context, taskRun *store.TaskRunMessage) error {
task, err := s.store.GetTaskByID(ctx, taskRun.TaskUID)
if err != nil {
return errors.Wrapf(err, "failed to get task")
}
instance, err := s.store.GetInstance(ctx, &store.FindInstanceMessage{ResourceID: &task.InstanceID})
if err != nil {
return errors.Wrapf(err, "failed to get instance")
}
if instance.Deleted {
return errors.Errorf("instance %v is deleted", task.InstanceID)
}
executor, ok := s.executorMap[task.Type]
if !ok {
return errors.Errorf("executor not found for task type: %v", task.Type)
}
// Update started_at
if err := s.store.UpdateTaskRunStartAt(ctx, taskRun.ID); err != nil {
return errors.Wrapf(err, "failed to update task run start at")
}
// Register as running
s.stateCfg.RunningTaskRuns.Store(taskRun.ID, true)
s.store.CreateTaskRunLogS(ctx, taskRun.ID, time.Now(), s.profile.DeployID, &storepb.TaskRunLog{
Type: storepb.TaskRunLog_TASK_RUN_STATUS_UPDATE,
TaskRunStatusUpdate: &storepb.TaskRunLog_TaskRunStatusUpdate{
Status: storepb.TaskRunLog_TaskRunStatusUpdate_RUNNING_RUNNING,
},
})
go s.runTaskRunOnce(ctx, taskRun, task, executor)
return nil
}
Step 3: Remove old scheduleRunningTaskRun function
Delete the old scheduleRunningTaskRun function (lines 92-209) - it's replaced by the simplified logic above.
Step 4: Update runTaskRunOnce to remove in-memory state tracking
In runTaskRunOnce function, remove the database migration tracking since it's no longer needed. Update the defer block (lines 221-228):
defer func() {
s.stateCfg.RunningTaskRunsCancelFunc.Delete(taskRun.ID)
}()
Remove references to:
s.stateCfg.RunningDatabaseMigrations.stateCfg.RolloutOutstandingTasksStep 5: Build and verify
go build ./backend/runner/taskrun/...
Step 6: Commit
but commit task-scheduler-available-status -m "running_scheduler: simplify to claim AVAILABLE and execute"
Files:
backend/store/task_run.goStep 1: Add ClaimAvailableTaskRun function
Add after UpdateTaskRunStatus:
// ClaimAvailableTaskRun attempts to atomically claim an AVAILABLE task run by updating it to RUNNING.
// Returns true if the claim succeeded, false if another process claimed it first.
func (s *Store) ClaimAvailableTaskRun(ctx context.Context, taskRunID int) (bool, error) {
q := qb.Q().Space(`
UPDATE task_run
SET status = ?, updated_at = now()
WHERE id = ? AND status = ?
`, storepb.TaskRun_RUNNING.String(), taskRunID, storepb.TaskRun_AVAILABLE.String())
query, args, err := q.ToSQL()
if err != nil {
return false, errors.Wrapf(err, "failed to build sql")
}
result, err := s.GetDB().ExecContext(ctx, query, args...)
if err != nil {
return false, errors.Wrapf(err, "failed to claim task run")
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return false, errors.Wrapf(err, "failed to get rows affected")
}
return rowsAffected == 1, nil
}
Step 2: Build and verify
go build ./backend/store/...
Step 3: Commit
but commit task-scheduler-available-status -m "store: add ClaimAvailableTaskRun for optimistic locking"
Files:
backend/component/state/state.goStep 1: Remove RunningDatabaseMigration field
This field is no longer needed since gating is done in pending_scheduler via DB queries. Remove line 22-23:
// RunningDatabaseMigration is the taskUID of the running migration on the database.
RunningDatabaseMigration sync.Map // map[databaseKey]taskUID
Step 2: Remove RolloutOutstandingTasks field
This is no longer needed. Remove lines 31-32:
// RolloutOutstandingTasks is the maximum number of tasks per rollout.
RolloutOutstandingTasks *resourceLimiter
Also update the New() function to remove initialization.
Step 3: Build and verify all usages are removed
go build ./backend/...
Step 4: Commit
but commit task-scheduler-available-status -m "state: remove unused RunningDatabaseMigration and RolloutOutstandingTasks"
Files:
backend/store/task_run.go:369-389Step 1: Update to also cancel AVAILABLE tasks
The batch cancel should work for AVAILABLE tasks too. No change needed - it updates by ID regardless of status.
Step 2: Verify the rollout service uses correct statuses
Check backend/api/v1/rollout_service.go for any status checks that need AVAILABLE added.
Files:
frontend/src/components/RolloutV1/constants/task.tsfrontend/src/components/Plan/constants/task.tsfrontend/src/components/RolloutV1/components/utils/taskStatus.tsStep 1: Update TASK_STATUS_FILTERS in RolloutV1
In frontend/src/components/RolloutV1/constants/task.ts:
import { Task_Status } from "@/types/proto-es/v1/rollout_service_pb";
export const TASK_STATUS_FILTERS: Task_Status[] = [
Task_Status.RUNNING,
Task_Status.AVAILABLE,
Task_Status.FAILED,
Task_Status.PENDING,
Task_Status.NOT_STARTED,
Task_Status.CANCELED,
Task_Status.DONE,
Task_Status.SKIPPED,
];
Step 2: Update TASK_STATUS_FILTERS in Plan
In frontend/src/components/Plan/constants/task.ts:
import { Task_Status } from "@/types/proto-es/v1/rollout_service_pb";
export const TASK_STATUS_FILTERS: Task_Status[] = [
Task_Status.RUNNING,
Task_Status.AVAILABLE,
Task_Status.FAILED,
Task_Status.PENDING,
Task_Status.NOT_STARTED,
Task_Status.CANCELED,
Task_Status.DONE,
Task_Status.SKIPPED,
];
Step 3: Update taskStatus.ts
In frontend/src/components/RolloutV1/components/utils/taskStatus.ts, add AVAILABLE to ACTIONABLE_STATUSES:
const ACTIONABLE_STATUSES = new Set<Task_Status>([
Task_Status.NOT_STARTED,
Task_Status.PENDING,
Task_Status.AVAILABLE,
Task_Status.RUNNING,
Task_Status.FAILED,
Task_Status.CANCELED,
]);
Step 4: Commit
but commit task-scheduler-available-status -m "frontend: add AVAILABLE status to task filters"
Files:
frontend/src/locales/en-US.jsonfrontend/src/locales/zh-CN.jsonfrontend/src/locales/ja-JP.jsonfrontend/src/locales/es-ES.jsonStep 1: Add translations
Find the task status section in each locale file and add AVAILABLE translation.
For en-US.json:
"available": "Available"
For zh-CN.json:
"available": "就绪"
Step 2: Commit
but commit task-scheduler-available-status -m "i18n: add AVAILABLE status translations"
Step 1: Run Go linter
golangci-lint run --allow-parallel-runners
Fix any issues. Run repeatedly until clean.
Step 2: Run frontend checks
pnpm --dir frontend type-check
pnpm --dir frontend biome:check
Step 3: Run backend build
go build -ldflags "-w -s" -p=16 -o ./bytebase-build/bytebase ./backend/bin/server/main.go
Step 4: Commit any fixes
but commit task-scheduler-available-status -m "chore: fix lint issues"
Step 1: Review all changes
but status
git diff main...HEAD
Step 2: Push branch
but push task-scheduler-available-status
Step 3: Create PR
gh pr create --base main --head task-scheduler-available-status \
--title "refactor: add AVAILABLE status to task scheduler" \
--body "$(cat <<'EOF'
## Summary
- Add AVAILABLE status between PENDING and RUNNING for task runs
- Centralize all gating logic in pending_scheduler (RunAt, version ordering, database locks, parallel limits)
- Simplify running_scheduler to just claim AVAILABLE tasks and execute
- Prepare foundation for HA with optimistic locking
## Test plan
- [ ] Verify PENDING tasks transition to AVAILABLE when all constraints are met
- [ ] Verify AVAILABLE tasks are claimed and executed
- [ ] Verify database mutual exclusion works (only one DDL/SDL per database)
- [ ] Verify parallel limit is respected
- [ ] Verify frontend shows AVAILABLE status correctly
🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"