docs/plans/2025-12-31-ha-webhook-delivery-design.md
Date: 2025-12-31 Status: Design Author: Danny
This design makes pipeline webhook notifications (PIPELINE_FAILED and PIPELINE_COMPLETED) work correctly in High Availability (HA) deployments where multiple Bytebase replicas run simultaneously. It replaces the in-memory TaskSkippedOrDoneChan channel with a database-backed idempotent delivery system.
The current webhook notification system uses an in-memory channel (state.TaskSkippedOrDoneChan) which breaks in HA:
// backend/component/state/state.go
type State struct {
TaskSkippedOrDoneChan chan int // In-memory, not shared across replicas
}
// backend/runner/taskrun/running_scheduler.go
s.stateCfg.TaskSkippedOrDoneChan <- task.ID // Send notification
// backend/runner/taskrun/scheduler.go
case taskUID := <-s.stateCfg.TaskSkippedOrDoneChan: // Receive notification
s.checkPlanCompletion(ctx, task.PlanID)
HA Issues:
Use a single database table with PRIMARY KEY constraint to ensure exactly-once delivery across HA replicas.
PIPELINE_FAILED and PIPELINE_COMPLETED are mutually exclusive per plan-- Tracks webhook delivery for pipeline events (PIPELINE_FAILED or PIPELINE_COMPLETED).
-- One row per plan at any time - mutually exclusive events.
-- Row is deleted when user clicks BatchRunTasks to reset notification state.
CREATE TABLE webhook_delivery_log (
plan_id BIGINT PRIMARY KEY REFERENCES plan(id),
-- Event type: 'PIPELINE_FAILED' or 'PIPELINE_COMPLETED'
event_type TEXT NOT NULL,
delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Design choices:
plan_id as PRIMARY KEY enforces uniqueness automaticallyevent_type for audit trail (debugging which event was sent)Location: backend/runner/taskrun/running_scheduler.go
// Add after task run status is marked as FAILED
func (s *Scheduler) handleTaskRunFailure(ctx context.Context, taskRun *store.TaskRunMessage) {
task, err := s.store.GetTaskByID(ctx, taskRun.TaskID)
if err != nil {
slog.Error("failed to get task for failure notification", log.BBError(err))
return
}
// Try to claim notification (only one replica succeeds)
claimed, err := s.store.ClaimPipelineFailureNotification(ctx, task.PlanID)
if err != nil {
slog.Error("failed to claim pipeline failure notification", log.BBError(err))
return
}
if !claimed {
// Already sent by this or another replica
return
}
// Get all failed tasks for this plan
failures, err := s.getFailedTaskRuns(ctx, task.PlanID)
if err != nil {
slog.Error("failed to get failed task runs", log.BBError(err))
return
}
// Get plan/project context
plan, err := s.store.GetPlan(ctx, &store.FindPlanMessage{UID: &task.PlanID})
if err != nil || plan == nil {
slog.Error("failed to get plan for failure webhook", log.BBError(err))
return
}
project, err := s.store.GetProject(ctx, &store.FindProjectMessage{ResourceID: &plan.ProjectID})
if err != nil || project == nil {
slog.Error("failed to get project for failure webhook", log.BBError(err))
return
}
// Send webhook
s.webhookManager.CreateEvent(ctx, &webhook.Event{
Type: storepb.Activity_PIPELINE_FAILED,
Project: webhook.NewProject(project),
PipelineFailed: &webhook.EventPipelineFailed{
Rollout: webhook.NewRollout(plan),
FailedTasks: failures,
},
})
}
Location: backend/runner/taskrun/scheduler.go (existing checkPlanCompletion function)
func (s *Scheduler) checkPlanCompletion(ctx context.Context, planID int64) {
// ... existing task completion check logic ...
if !allComplete {
return
}
if hasFailures {
return // Don't send completion if there were failures
}
// Try to claim completion notification (HA-safe)
claimed, err := s.store.ClaimPipelineCompletionNotification(ctx, planID)
if err != nil {
slog.Error("failed to claim pipeline completion notification", log.BBError(err))
return
}
if !claimed {
return // Already sent
}
// ... existing webhook send code ...
s.webhookManager.CreateEvent(ctx, &webhook.Event{
Type: storepb.Activity_PIPELINE_COMPLETED,
Project: webhook.NewProject(project),
RolloutCompleted: &webhook.EventRolloutCompleted{
Rollout: webhook.NewRollout(plan),
},
})
}
Location: backend/api/v1/rollout_service.go (existing BatchRunTasks function)
func (s *RolloutService) BatchRunTasks(ctx context.Context, req *connect.Request[v1pb.BatchRunTasksRequest]) (*connect.Response[v1pb.BatchRunTasksResponse], error) {
// ... existing validation code ...
// Reset notification state so user gets fresh feedback on retry
if err := s.store.ResetPipelineNotification(ctx, planID); err != nil {
slog.Error("failed to reset pipeline notification", log.BBError(err))
// Don't fail the request - notification is non-critical
}
// ... existing CreatePendingTaskRuns call ...
return connect.NewResponse(&v1pb.BatchRunTasksResponse{}), nil
}
Location: backend/store/webhook_delivery_log.go (new file)
package store
import (
"context"
"database/sql"
)
// ResetPipelineNotification deletes the notification log for a plan.
// Called when user clicks BatchRunTasks to enable new notifications on retry.
func (s *Store) ResetPipelineNotification(ctx context.Context, planID int64) error {
query := `DELETE FROM webhook_delivery_log WHERE plan_id = $1`
_, err := s.db.ExecContext(ctx, query, planID)
return err
}
// ClaimPipelineFailureNotification attempts to claim the right to send PIPELINE_FAILED webhook.
// Returns true if claimed (should send), false if already sent or claimed by another replica.
// HA-safe: PRIMARY KEY constraint prevents duplicate sends across replicas.
func (s *Store) ClaimPipelineFailureNotification(ctx context.Context, planID int64) (bool, error) {
query := `
INSERT INTO webhook_delivery_log (plan_id, event_type)
VALUES ($1, 'PIPELINE_FAILED')
ON CONFLICT (plan_id) DO NOTHING
RETURNING plan_id
`
var id int64
err := s.db.QueryRowContext(ctx, query, planID).Scan(&id)
if err == sql.ErrNoRows {
return false, nil // Already exists
}
return err == nil, err
}
// ClaimPipelineCompletionNotification attempts to claim the right to send PIPELINE_COMPLETED webhook.
// Returns true if claimed (should send), false if already sent or claimed by another replica.
// HA-safe: PRIMARY KEY constraint prevents duplicate sends across replicas.
func (s *Store) ClaimPipelineCompletionNotification(ctx context.Context, planID int64) (bool, error) {
query := `
INSERT INTO webhook_delivery_log (plan_id, event_type)
VALUES ($1, 'PIPELINE_COMPLETED')
ON CONFLICT (plan_id) DO NOTHING
RETURNING plan_id
`
var id int64
err := s.db.QueryRowContext(ctx, query, planID).Scan(&id)
if err == sql.ErrNoRows {
return false, nil // Already exists
}
return err == nil, err
}
Initial deployment (all tasks attempt 0):
→ Task A fails
→ Replica 1: ClaimPipelineFailureNotification(plan=123)
→ INSERT succeeds → Send webhook: "Pipeline failed - Task A failed"
→ Task B fails
→ Replica 2: ClaimPipelineFailureNotification(plan=123)
→ ON CONFLICT (row already exists) → Skip, no duplicate webhook
Result: One webhook sent, no spam ✅
Database: (plan_id=123, event_type='PIPELINE_FAILED')
User clicks "Retry Tasks":
→ BatchRunTasks API called
→ DELETE FROM webhook_delivery_log WHERE plan_id=123
→ CreatePendingTaskRuns (creates attempt=1 for failed tasks)
Database: (empty - row deleted)
→ Task A succeeds (attempt=1)
→ Task B fails (attempt=1)
→ Replica 1: ClaimPipelineFailureNotification(plan=123)
→ INSERT succeeds (no conflict, row was deleted)
→ Send webhook: "Pipeline failed on retry - Task B failed"
Result: User gets feedback on retry ✅
Database: (plan_id=123, event_type='PIPELINE_FAILED')
User retries again:
→ BatchRunTasks called
→ DELETE FROM webhook_delivery_log WHERE plan_id=123
→ All tasks succeed
→ checkPlanCompletion detects all tasks done, no failures
→ Replica 2: ClaimPipelineCompletionNotification(plan=123)
→ INSERT succeeds
→ Send webhook: "Pipeline completed successfully"
Result: Completion notification sent ✅
Database: (plan_id=123, event_type='PIPELINE_COMPLETED')
Two replicas detect same task failure simultaneously:
→ Replica A: ClaimPipelineFailureNotification(plan=123)
→ INSERT INTO webhook_delivery_log VALUES (123, 'PIPELINE_FAILED')
→ Returns plan_id=123 → claimed=true → Sends webhook
→ Replica B: ClaimPipelineFailureNotification(plan=123) (100ms later)
→ INSERT INTO webhook_delivery_log VALUES (123, 'PIPELINE_FAILED')
→ ON CONFLICT (plan_id) DO NOTHING
→ Returns no rows → claimed=false → Skips webhook
Result: Only one webhook sent across HA replicas ✅
Create migration file: backend/migrator/migration/<<version>>/<<sequence>>##webhook_delivery_log.sql
CREATE TABLE webhook_delivery_log (
plan_id BIGINT PRIMARY KEY REFERENCES plan(id),
event_type TEXT NOT NULL,
delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
After implementation, remove in-memory channel:
Remove from state:
// backend/component/state/state.go
// DELETE: TaskSkippedOrDoneChan chan int
Remove senders:
// backend/api/v1/rollout_service.go
// DELETE: s.stateCfg.TaskSkippedOrDoneChan <- task.ID
// backend/runner/taskrun/running_scheduler.go
// DELETE: s.stateCfg.TaskSkippedOrDoneChan <- task.ID
Remove receiver:
// backend/runner/taskrun/scheduler.go
// DELETE: case taskUID := <-s.stateCfg.TaskSkippedOrDoneChan:
✅ HA-Compatible: Database PRIMARY KEY prevents duplicate webhooks across replicas
✅ No Spam: One notification per failure/completion phase
✅ Retry Support: DELETE on BatchRunTasks resets state for fresh notifications
✅ Immediate Delivery: No polling or aggregation windows needed
✅ Simple Schema: Single table with PRIMARY KEY, no complex indexes
✅ Audit Trail: event_type shows which notification was sent
✅ Follows Existing Pattern: Same approach as TaskRun session monitoring (commit 564f1e5618)
❌ Complex: Needs polling worker, window management ❌ Delayed: 5-minute wait before notification ❌ Race-prone: Multiple replicas might process same window
❌ Lock held during webhook HTTP call (can timeout) ❌ More complex than simple INSERT/DELETE
❌ Breaks in HA - not shared across replicas ❌ No persistence - lost on restart
✅ Simple INSERT/DELETE operations ✅ Immediate notifications ✅ HA-safe with database constraints ✅ Stateless - any replica can handle requests
retry_count column to track how many times plan was retriedfailed_task_count to show aggregated metrics