docs/plans/2025-12-30-taskrun-session-ha-design.md
Replace in-memory TaskRunConnectionID state with database-native connection identification using application_name. This makes session monitoring work in High Availability (HA) deployments where multiple Bytebase replicas run simultaneously.
The current implementation stores connection IDs in state.TaskRunConnectionID (sync.Map), which breaks in HA:
Use PostgreSQL's application_name parameter to make connections self-identifying. Query pg_stat_activity by application_name instead of connection ID.
bytebase-taskrun-{taskRunUID}
Examples:
bytebase-taskrun-12345bytebase (existing behavior)application_name in pg_stat_activity viewAdd TaskRunUID to connection context (backend/plugin/db/driver.go):
type ConnectionContext struct {
EnvironmentID string
InstanceID string
EngineVersion string
TenantMode bool
DatabaseName string
DataShare bool
ReadOnly bool
MessageBuffer []*v1pb.QueryResult_Message
TaskRunUID *int // NEW: Set when executing a task run
}
Modify backend/plugin/db/pg/pg.go (line 79):
// Current:
pgxConnConfig.RuntimeParams["application_name"] = "bytebase"
// New:
appName := "bytebase"
if config.ConnectionContext.TaskRunUID != nil {
appName = fmt.Sprintf("bytebase-taskrun-%d", *config.ConnectionContext.TaskRunUID)
}
pgxConnConfig.RuntimeParams["application_name"] = appName
Pass TaskRunUID when creating driver (backend/runner/taskrun/executor.go):
driver, err := mc.dbFactory.GetAdminDatabaseDriver(ctx, mc.instance, mc.database, db.ConnectionContext{
EnvironmentID: mc.database.EnvironmentID,
InstanceID: mc.instance.ResourceID,
DatabaseName: mc.database.DatabaseName,
TaskRunUID: &mc.taskRunUID, // NEW
})
Update backend/api/v1/rollout_service.go:
Before:
stateCfg.TaskRunConnectionID.Load(taskRunUID)pg_stat_activity WHERE pid = $1After:
appName = fmt.Sprintf("bytebase-taskrun-%d", taskRunUID)pg_stat_activity WHERE application_name = $1Query Changes:
-- Find main session and blocking/blocked sessions
SELECT
pid,
pg_blocking_pids(pid) AS blocked_by_pids,
query,
state,
wait_event_type,
wait_event,
datname,
usename,
application_name,
client_addr,
client_port,
backend_start,
xact_start,
query_start
FROM
pg_catalog.pg_stat_activity
WHERE application_name = $1
OR pid = ANY(pg_blocking_pids((SELECT pid FROM pg_stat_activity WHERE application_name = $1 LIMIT 1)))
OR (SELECT pid FROM pg_stat_activity WHERE application_name = $1 LIMIT 1) = ANY(pg_blocking_pids(pid))
ORDER BY pid
Identify main session by comparing application_name instead of pid.
Delete from backend/component/state/state.go:
TaskRunConnectionID sync.Map // map[taskRunID]string
Delete from backend/plugin/db/driver.go ExecuteOptions:
SetConnectionID func(id string)
DeleteConnectionID func()
Delete from backend/runner/taskrun/executor.go:
opts.SetConnectionID = func(id string) {
stateCfg.TaskRunConnectionID.Store(mc.taskRunUID, id)
}
opts.DeleteConnectionID = func() {
stateCfg.TaskRunConnectionID.Delete(mc.taskRunUID)
}
Delete from backend/plugin/db/pg/pg.go (2 occurrences):
if opts.SetConnectionID != nil {
var pid string
if err := conn.QueryRowContext(ctx, "SELECT pg_backend_pid()").Scan(&pid); err != nil {
return 0, errors.Wrapf(err, "failed to get connection id")
}
opts.SetConnectionID(pid)
if opts.DeleteConnectionID != nil {
defer opts.DeleteConnectionID()
}
}
application_name supportapp name but deferred to future work)For unsupported engines, GetTaskRunSession() returns:
"session monitoring is only supported for PostgreSQL and CockroachDB"
| File | Changes |
|---|---|
backend/plugin/db/driver.go | Add TaskRunUID *int to ConnectionContext |
backend/plugin/db/pg/pg.go | Set application_name based on TaskRunUID |
backend/runner/taskrun/executor.go | Pass TaskRunUID in ConnectionContext |
backend/api/v1/rollout_service.go | Query by application_name instead of loading from state |
backend/component/state/state.go | Remove TaskRunConnectionID field |
TaskRunUID to ConnectionContextapplication_nameTaskRunUIDGetTaskRunSession() API to query by application_nameManual Testing:
GetTaskRunSession() during execution from any replicaGetTaskRunSession() returns "not supported" errorAutomated Testing:
application_name format matches bytebase-taskrun-{uid}pg_stat_activity, verify app name appearsGetTaskRunSession() requestspg_stat_activity without API callsapp name parameter and sys.dm_exec_sessionsNo breaking changes:
TaskRunUID field is optional (pointer)application_name = "bytebase"