docs/plans/2025-12-31-ha-webhook-delivery-impl.md
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement database-backed webhook delivery to make PIPELINE_FAILED and PIPELINE_COMPLETED notifications work correctly in HA deployments
Architecture: Replace in-memory TaskSkippedOrDoneChan channel with plan_webhook_delivery table using PRIMARY KEY constraint for atomic claiming across replicas
Tech Stack: Go, PostgreSQL, existing webhook infrastructure
Files:
backend/migrator/migration/3.14/0016##plan_webhook_delivery.sqlStep 1: Create migration file
Create backend/migrator/migration/3.14/0016##plan_webhook_delivery.sql:
-- Tracks webhook delivery for pipeline events (PIPELINE_FAILED or PIPELINE_COMPLETED).
-- One row per plan at any time - mutually exclusive events.
-- Row is deleted when user clicks BatchRunTasks to reset notification state.
CREATE TABLE plan_webhook_delivery (
plan_id BIGINT PRIMARY KEY REFERENCES plan(id),
-- Event type: 'PIPELINE_FAILED' or 'PIPELINE_COMPLETED'
event_type TEXT NOT NULL,
delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Step 2: Update LATEST.sql
Add to backend/migrator/migration/LATEST.sql after the plan_check_run table (around line 231):
-- Tracks webhook delivery for pipeline events (PIPELINE_FAILED or PIPELINE_COMPLETED).
-- One row per plan at any time - mutually exclusive events.
-- Row is deleted when user clicks BatchRunTasks to reset notification state.
CREATE TABLE plan_webhook_delivery (
plan_id BIGINT PRIMARY KEY REFERENCES plan(id),
-- Event type: 'PIPELINE_FAILED' or 'PIPELINE_COMPLETED'
event_type TEXT NOT NULL,
delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ALTER SEQUENCE plan_webhook_delivery_plan_id_seq RESTART WITH 101;
Step 3: Verify migration files
Run: ls backend/migrator/migration/3.14/
Expected: See 0016##plan_webhook_delivery.sql
Step 4: Commit migration
git add backend/migrator/migration/3.14/0016##plan_webhook_delivery.sql backend/migrator/migration/LATEST.sql
git commit -m "feat: add plan_webhook_delivery table for HA webhook deduplication
Table provides atomic claim mechanism for webhook delivery across HA replicas
using PRIMARY KEY constraint. Deleted on BatchRunTasks to reset state.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/store/plan_webhook_delivery.goStep 1: Create store methods file
Create backend/store/plan_webhook_delivery.go:
package store
import (
"context"
"database/sql"
)
// ResetPlanWebhookDelivery deletes the webhook delivery record for a plan.
// Called when user clicks BatchRunTasks to enable new notifications on retry.
func (s *Store) ResetPlanWebhookDelivery(ctx context.Context, planID int64) error {
query := `DELETE FROM plan_webhook_delivery WHERE plan_id = $1`
_, err := s.db.ExecContext(ctx, query, planID)
return err
}
// ClaimPipelineFailureNotification attempts to claim the right to send PIPELINE_FAILED webhook.
// Returns true if claimed (should send), false if already sent or claimed by another replica.
// HA-safe: PRIMARY KEY constraint prevents duplicate sends across replicas.
func (s *Store) ClaimPipelineFailureNotification(ctx context.Context, planID int64) (bool, error) {
query := `
INSERT INTO plan_webhook_delivery (plan_id, event_type)
VALUES ($1, 'PIPELINE_FAILED')
ON CONFLICT (plan_id) DO NOTHING
RETURNING plan_id
`
var id int64
err := s.db.QueryRowContext(ctx, query, planID).Scan(&id)
if err == sql.ErrNoRows {
return false, nil // Already exists
}
return err == nil, err
}
// ClaimPipelineCompletionNotification attempts to claim the right to send PIPELINE_COMPLETED webhook.
// Returns true if claimed (should send), false if already sent or claimed by another replica.
// HA-safe: PRIMARY KEY constraint prevents duplicate sends across replicas.
func (s *Store) ClaimPipelineCompletionNotification(ctx context.Context, planID int64) (bool, error) {
query := `
INSERT INTO plan_webhook_delivery (plan_id, event_type)
VALUES ($1, 'PIPELINE_COMPLETED')
ON CONFLICT (plan_id) DO NOTHING
RETURNING plan_id
`
var id int64
err := s.db.QueryRowContext(ctx, query, planID).Scan(&id)
if err == sql.ErrNoRows {
return false, nil // Already exists
}
return err == nil, err
}
Step 2: Build to verify
Run: go build ./backend/store/...
Expected: Build succeeds
Step 3: Commit store methods
git add backend/store/plan_webhook_delivery.go
git commit -m "feat: add store methods for plan webhook delivery
Implement atomic claim methods for PIPELINE_FAILED and PIPELINE_COMPLETED
webhooks using INSERT...ON CONFLICT for HA safety. Add reset method for
BatchRunTasks.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/api/v1/rollout_service.go:627-752Step 1: Add reset call to BatchRunTasks
In backend/api/v1/rollout_service.go, add after getting the plan (around line 655):
// Reset notification state so user gets fresh feedback on retry
if err := s.store.ResetPlanWebhookDelivery(ctx, planID); err != nil {
slog.Error("failed to reset plan webhook delivery", log.BBError(err))
// Don't fail the request - notification is non-critical
}
Full context - the function should look like:
func (s *RolloutService) BatchRunTasks(ctx context.Context, req *connect.Request[v1pb.BatchRunTasksRequest]) (*connect.Response[v1pb.BatchRunTasksResponse], error) {
request := req.Msg
// ... existing validation code ...
plan, err := s.store.GetPlan(ctx, &store.FindPlanMessage{
ProjectID: &projectID,
UID: &planID,
})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.Wrapf(err, "failed to find plan for rollout"))
}
if plan == nil {
return nil, connect.NewError(connect.CodeNotFound, errors.Errorf("rollout (plan) %v not found", planID))
}
// Reset notification state so user gets fresh feedback on retry
if err := s.store.ResetPlanWebhookDelivery(ctx, planID); err != nil {
slog.Error("failed to reset plan webhook delivery", log.BBError(err))
// Don't fail the request - notification is non-critical
}
// ... rest of existing code ...
}
Step 2: Build to verify
Run: go build ./backend/api/v1/...
Expected: Build succeeds
Step 3: Commit BatchRunTasks integration
git add backend/api/v1/rollout_service.go
git commit -m "feat: reset webhook delivery state on BatchRunTasks
Delete plan_webhook_delivery row when user retries tasks to enable fresh
notifications on retry attempts.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/runner/taskrun/scheduler.goStep 1: Add helper method to get failed task runs
Add near the end of backend/runner/taskrun/scheduler.go:
// getFailedTaskRuns returns all failed task runs for a plan to include in webhook payload.
func (s *Scheduler) getFailedTaskRuns(ctx context.Context, planID int64) []webhook.FailedTask {
tasks, err := s.store.ListTasks(ctx, &store.TaskFind{PlanID: &planID})
if err != nil {
slog.Error("failed to list tasks for failed webhook", log.BBError(err))
return nil
}
var failures []webhook.FailedTask
for _, task := range tasks {
if task.LatestTaskRunStatus != storepb.TaskRun_FAILED {
continue
}
// Get the latest failed task run
taskRuns, err := s.store.ListTaskRuns(ctx, &store.TaskRunFind{TaskID: &task.ID})
if err != nil {
slog.Error("failed to list task runs", log.BBError(err))
continue
}
// Find the latest failed run
var latestFailed *store.TaskRunMessage
for _, tr := range taskRuns {
if tr.Status == storepb.TaskRun_FAILED {
if latestFailed == nil || tr.UpdatedAt.After(*latestFailed.UpdatedAt) {
latestFailed = tr
}
}
}
if latestFailed == nil {
continue
}
errorMsg := ""
if latestFailed.Result != nil && latestFailed.Result.Error != "" {
errorMsg = latestFailed.Result.Error
}
failures = append(failures, webhook.FailedTask{
TaskID: int64(task.ID),
TaskName: task.Name,
DatabaseName: task.DatabaseName,
InstanceName: task.Instance.ResourceID,
ErrorMessage: errorMsg,
FailedAt: *latestFailed.UpdatedAt,
})
}
return failures
}
Step 2: Build to verify
Run: go build ./backend/runner/taskrun/...
Expected: Build succeeds
Step 3: Commit helper method
git add backend/runner/taskrun/scheduler.go
git commit -m "feat: add helper to get failed task runs for webhook
Collects all failed tasks in a plan with error messages for webhook payload.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/runner/taskrun/running_scheduler.goStep 1: Check where task is marked as FAILED
Read backend/runner/taskrun/running_scheduler.go around line 200-240 to find where task status is set to FAILED.
Expected: Find code like:
if _, err := s.store.UpdateTaskRunStatus(ctx, &store.TaskRunStatusPatch{
ID: taskRun.ID,
Status: storepb.TaskRun_FAILED,
...
}); err != nil {
...
}
Step 2: Add webhook notification after marking task as FAILED
After the task is marked as FAILED and before the return statement, add:
// Send PIPELINE_FAILED webhook (HA-safe atomic claim)
s.sendPipelineFailureWebhook(ctx, task)
Step 3: Implement sendPipelineFailureWebhook method
Add to backend/runner/taskrun/running_scheduler.go:
// sendPipelineFailureWebhook attempts to send PIPELINE_FAILED webhook.
// Uses atomic claim to prevent duplicate sends in HA deployments.
func (s *Scheduler) sendPipelineFailureWebhook(ctx context.Context, task *store.TaskMessage) {
// Try to claim notification (only one replica succeeds)
claimed, err := s.store.ClaimPipelineFailureNotification(ctx, task.PlanID)
if err != nil {
slog.Error("failed to claim pipeline failure notification", log.BBError(err))
return
}
if !claimed {
// Already sent by this or another replica
return
}
// Get plan context
plan, err := s.store.GetPlan(ctx, &store.FindPlanMessage{UID: &task.PlanID})
if err != nil || plan == nil {
slog.Error("failed to get plan for failure webhook", log.BBError(err))
return
}
project, err := s.store.GetProject(ctx, &store.FindProjectMessage{ResourceID: &plan.ProjectID})
if err != nil || project == nil {
slog.Error("failed to get project for failure webhook", log.BBError(err))
return
}
// Get all failed tasks
failures := s.getFailedTaskRuns(ctx, task.PlanID)
if len(failures) == 0 {
slog.Warn("no failed tasks found for pipeline failure webhook", slog.Int64("plan_id", task.PlanID))
return
}
// Send webhook
s.webhookManager.CreateEvent(ctx, &webhook.Event{
Type: storepb.Activity_PIPELINE_FAILED,
Project: webhook.NewProject(project),
PipelineFailed: &webhook.EventPipelineFailed{
Rollout: webhook.NewRollout(plan),
FailedTasks: failures,
},
})
}
Step 4: Build to verify
Run: go build ./backend/runner/taskrun/...
Expected: Build succeeds
Step 5: Commit pipeline failure webhook
git add backend/runner/taskrun/running_scheduler.go
git commit -m "feat: send PIPELINE_FAILED webhook on task failure
Add atomic claim-based webhook delivery when task fails. Only first failure
in a plan triggers notification, preventing spam in HA deployments.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/runner/taskrun/scheduler.go:112-210Step 1: Add claim check before sending completion webhook
In backend/runner/taskrun/scheduler.go, find the checkPlanCompletion function (around line 112).
After the if hasFailures { return } check (around line 167), add:
// Try to claim completion notification (HA-safe)
claimed, err := s.store.ClaimPipelineCompletionNotification(ctx, planID)
if err != nil {
slog.Error("failed to claim pipeline completion notification", log.BBError(err))
return
}
if !claimed {
return // Already sent
}
Full context - the section should look like:
// Not all tasks complete yet
if !allComplete {
return
}
// Always clear the failure window when plan completes
s.pipelineEvents.Clear(planID)
// Only send completion webhook if there were no failures
if hasFailures {
return
}
// Try to claim completion notification (HA-safe)
claimed, err := s.store.ClaimPipelineCompletionNotification(ctx, planID)
if err != nil {
slog.Error("failed to claim pipeline completion notification", log.BBError(err))
return
}
if !claimed {
return // Already sent
}
// ... existing webhook send code continues ...
Step 2: Build to verify
Run: go build ./backend/runner/taskrun/...
Expected: Build succeeds
Step 3: Commit pipeline completion webhook
git add backend/runner/taskrun/scheduler.go
git commit -m "feat: add atomic claim for PIPELINE_COMPLETED webhook
Prevent duplicate completion webhooks in HA deployments using database-backed
claim mechanism.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Files:
backend/component/state/state.gobackend/api/v1/rollout_service.gobackend/runner/taskrun/running_scheduler.gobackend/runner/taskrun/scheduler.goStep 1: Remove channel from state
In backend/component/state/state.go, remove these lines (around line 23-24):
// DELETE these lines:
// TaskSkippedOrDoneChan is the channel for notifying the task is skipped or done.
TaskSkippedOrDoneChan chan int
And in the New() function (around line 37):
// DELETE this line:
TaskSkippedOrDoneChan: make(chan int, 1000),
Step 2: Remove sender in rollout_service.go
In backend/api/v1/rollout_service.go, find and remove (around line 836):
// DELETE these lines:
for _, task := range tasksToSkip {
s.stateCfg.TaskSkippedOrDoneChan <- task.ID
}
Step 3: Remove sender in running_scheduler.go
In backend/runner/taskrun/running_scheduler.go, find and remove (around line 232):
// DELETE this line:
s.stateCfg.TaskSkippedOrDoneChan <- task.ID
Step 4: Remove receiver in scheduler.go
In backend/runner/taskrun/scheduler.go, find the runTaskCompletionListener function (around line 82-110).
Remove the entire case statement:
// DELETE this entire case block:
case taskUID := <-s.stateCfg.TaskSkippedOrDoneChan:
if err := func() error {
task, err := s.store.GetTaskByID(ctx, taskUID)
if err != nil {
return errors.Wrapf(err, "failed to get task")
}
// Check if entire plan is complete and handle webhooks
s.checkPlanCompletion(ctx, task.PlanID)
return nil
}(); err != nil {
slog.Error("failed to handle task completion", log.BBError(err))
}
Step 5: Build to verify
Run: go build ./backend/...
Expected: Build succeeds
Step 6: Commit channel removal
git add backend/component/state/state.go backend/api/v1/rollout_service.go backend/runner/taskrun/running_scheduler.go backend/runner/taskrun/scheduler.go
git commit -m "refactor: remove TaskSkippedOrDoneChan in-memory channel
Remove in-memory channel replaced by database-backed webhook delivery.
TaskSkippedOrDoneChan is no longer needed as webhooks are triggered directly
in task failure and completion handlers.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Step 1: Start local Bytebase
Run: PG_URL=postgresql://bbdev@localhost/bbdev go run ./backend/bin/server/main.go --port 8080 --data . --debug
Expected: Server starts, migration runs successfully
Step 2: Verify table created
Run: psql -U bbdev bbdev -c "\d plan_webhook_delivery"
Expected: See table schema with plan_id PRIMARY KEY
Step 3: Test PIPELINE_FAILED webhook
SELECT * FROM plan_webhook_delivery;
Expected: One row with event_type='PIPELINE_FAILED'Step 4: Test retry doesn't send duplicate
SELECT COUNT(*) FROM plan_webhook_delivery WHERE plan_id = ?;
Expected: Still 1 rowStep 5: Test BatchRunTasks resets state
SELECT * FROM plan_webhook_delivery WHERE plan_id = ?;
Expected: 0 rows (deleted)SELECT COUNT(*) FROM plan_webhook_delivery WHERE plan_id = ?;
Expected: 1 rowStep 6: Test PIPELINE_COMPLETED
SELECT * FROM plan_webhook_delivery WHERE plan_id = ?;
Expected: One row with event_type='PIPELINE_COMPLETED'Step 7: Document test results
Create test notes in commit message for final commit.
Files:
backend/migrator/migrator_test.goStep 1: Find TestLatestVersion
In backend/migrator/migrator_test.go, find TestLatestVersion function.
Step 2: Update latest version
Find the line that looks like:
latestSchemaVersion := "3.14.15"
Increment the last number:
latestSchemaVersion := "3.14.16"
Step 3: Run test
Run: go test -v ./backend/migrator/ -run TestLatestVersion
Expected: Test passes
Step 4: Commit migrator test update
git add backend/migrator/migrator_test.go
git commit -m "test: update latest schema version to 3.14.16
Update for plan_webhook_delivery table migration.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
Step 1: Format Go code
Run: gofmt -w backend/store/plan_webhook_delivery.go backend/api/v1/rollout_service.go backend/runner/taskrun/scheduler.go backend/runner/taskrun/running_scheduler.go backend/component/state/state.go
Expected: Files formatted
Step 2: Run golangci-lint
Run: golangci-lint run --allow-parallel-runners
Expected: No errors (run multiple times until clean)
Step 3: Auto-fix lint issues
Run: golangci-lint run --fix --allow-parallel-runners
Expected: Auto-fixable issues resolved
Step 4: Build backend
Run: go build -ldflags "-w -s" -p=16 -o ./bytebase-build/bytebase ./backend/bin/server/main.go
Expected: Build succeeds
Step 5: Commit any lint fixes
git add -A
git commit -m "chore: fix linting issues
Auto-fix and manual lint issue resolution.
š¤ Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>"
plan_webhook_delivery tableTaskSkippedOrDoneChan removed