docs/tech-notes/jobs.md
Original authors: Michael Butler & Shiranka Miskin (November 2021)
Jobs are CockroachDB's way of representing long running background tasks. They are used internally as a core part of various features of CockroachDB such as Changefeeds, Backups, and Schema Changes. Progress is regularly persisted such that a node may fail / the job can be paused and a node will still be able to resume the work later on.
Users can send PAUSE/RESUME/CANCEL commands to specific jobs via SQL
commands such as PAUSE JOB {job_id} for a single job, to an arbitrary set of
jobs with commands such as CANCEL JOBS {select_clause}, to specific job types
through commands such as RESUME ALL CHANGEFEED JOBS, or to jobs triggered by a
given schedule through commands such as PAUSE JOBS FOR SCHEDULE {schedule_id}.
Commands to specific job ids are handled through
controlJobsNode.startExec.
PAUSE and CANCEL commands move the job to a pause-requested and
cancel-requested state respectively, which allows the node currently running
the job to proceed with actually pausing or cancelling it (see jobs/cancel in
Job Management). RESUME moves the job to a running state
to be picked up by any node (see jobs/adopt in Job
Management).
The batch commands based on a schedule / type are simply delegated to the
respective {cmd} JOBS command via
delegateJobControl.
A Job is represented as a row in the
system.jobs
table which is the single source of truth for individual job information. Nodes
read this table to determine the jobs they can execute, using the
payload
to determine how to execute them and the
progress
to pick up from where other nodes may have left it. Completed/cancelled jobs are
eventually
garbage collected from system.jobs.
CREATE TABLE system.jobs (
id INT8 DEFAULT unique_rowid(),
created TIMESTAMP NOT NULL DEFAULT now(),
-- States such as "pending", "running", "paused", "pause-requested", etc.
status STRING NOT NULL,
-- Information specific to each type of job (ex: Changefeed, Backup).
payload BYTES NOT NULL, -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payload)
progress BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress)
-- Used to track which node currently owns execution of the job
claim_session_id BYTES,
claim_instance_id INT8,
-- The system that created the job and the id relevant to that system (ex: crdb_schedule and the schedule_id)
created_by_type STRING,
created_by_id INT,
-- Useful for observability
num_runs INT8,
last_run TIMESTAMP,
... -- constraint/index/family
)
Jobs are primarily specified and differentiated through their payload column
which are bytes of type
jobspb.Payload,
storing information related to the type, parameters, and metadata of the job.
message Payload {
// General information relevant to any job
string description = 1;
repeated string statement = 16;
int64 started_micros = 3;
int64 finished_micros = 4;
...
repeated errorspb.EncodedError resume_errors = 17;
repeated errorspb.EncodedError cleanup_errors = 18;
errorspb.EncodedError final_resume_error = 19;
...
// Type-specific payload details that mark the type of job and stores type-specific information
oneof details {
BackupDetails backup = 10;
RestoreDetails restore = 11;
... // Details for the many other types of jobs
}
...
}
The progress of the specified job through its lifecycle is tracked in a separate
progress column of type
jobspb.Progress.
message Progress {
// Generic information for observability
oneof progress {
float fraction_completed = 1;
util.hlc.Timestamp high_water = 3;
}
string running_status = 4;
// Type-specific progress information similar to Payload details
oneof details { // Note that while this is also called details, it does not take {JobType}Details structs like Payload
BackupProgress backup = 10;
RestoreProgress restore = 11;
... // Progress for the many other types of jobs
}
...
}
A more user-readable version of system.jobs with the parsed payload and
progress information can be monitored through the
crdb_internal.jobs
table (which simply reads from system.jobs). The SHOW JOBS command
operates by reading from
crdb_internal.jobs.
A Job is created when a new row is
inserted
to system.jobs. Once written, it is able to be "claimed" by any node in the
cluster (asserting that it is responsible for executing the job) through the
setting of claim_session_id and
claim_instance_id
in the job record. Job execution ends when the status changes from running to
success, cancelled, or failure. The job record eventually gets
deleted
from the system table -- by default, 14 hours after execution ends.
If that node fails to completely execute the job (either the node failing or the
job being paused), once the job record is in the table it is also able to be
resumed by any node in the cluster. The mechanism by which this is done is the
JobRegistry.
A node interacts with the jobs table through the JobRegistry
struct.
A node creates a job by calling the Registry's
CreateJobWithTxn
or
CreateAdoptableJobWithTxn
which
INSERTs
a new row into system.jobs . The node passes the job specific
information to these functions via the
Record
struct.
If the node created the job with CreateJobWithTxn, it will also claim the job by setting
the claim IDs and
start
the job. By contrast, CreateAdoptableJobWithTxn allows another node to adopt and
resume
the job (startableJob.Start() is never called on these jobs).
While job creation is triggered in response to client queries such as
BACKUP, job adoption, cancellation, and deletion is managed
through daemon goroutines. These daemon goroutines will continually run
on each node, allowing any node to participate in picking up work, unassigning
jobs that were on nodes that failed, and cleaning up old job records from the
table.
These goroutines begin when each node's SQL Server
starts,
initializing the JobRegistry via
jobRegistry.Start(...).
Specifically, jobRegistry.Start() kicks off the following goroutines:
jobs/adopt
: At the
jobs.registry.interval.adopt
interval (default 30s), call
claimJobs
to poll (query?) system.jobs and attempt to
UPDATE
up to $COCKROACH_JOB_ADOPTIONS_PER_PERIOD (default 10) rows without an
assigned claim_session_id with the current node's session and instance IDs.
After claiming
new jobs, the node resumes them.
jobs/cancel
: At the
jobs.registry.interval.cancel
interval (default 10s), set claim_session_id = NULL
for up to
jobs.cancel_update_limit
jobs with session IDs that fail a SQL Liveness
check.
See the SQL Liveness RFC
for more information on liveness and claim IDs. Jobs claimed by the current node in the pause-requested
and cancel-requested states are also
transitioned
to the PAUSED and REVERTING states by the same daemon
goroutine.
jobs/gc
: At the
jobs.registry.interval.gc
interval (default 1h), query for jobs with a status of
Succeded/Cancelled/Failed
that stopped running more than
jobs.retention_time
(default 2 weeks) ago and
DELETE
them.
finished_micros of the
Payload protobuf and cannot be read directly by SQL, up to
100
job records are
SELECT ed
and their payloads are
unmarshalled
to filter on the finished time.Each job type registers its respective execution logic through
jobs.RegisterConstructor
which globally registers a
Constructor
function for a given
jobspb.Type.
This Constructor returns an implementation of
jobs.Resumer
with the Resume and OnFailOrCancel functions for the registry to execute.
When a job is adopted and
resumed
by a node's registry, unless the job is not already running or completed,
runJob
is ran in its own goroutine as an async
task.
During adoption a new context is
created
for the runJob execution with its own
cancel
function that is stored in the
registry's internal
map
of adopted
jobs.
This cancel callback stored in the registry allows it to remotely terminate the
job when it must
servePauseAndCancelRequests.
This thread handles executing the job by modeling it as a state machine through
the registry's
stepThroughStateMachine,
starting from the StatusRunning state. It calls
resumer.Resume
to execute the job-specific logic, and once the function completes it
recursively calls stepThroughStateMachine to transition to the appropriate
next state depending on if the resumption completed successfully or due to some
form of error (either through the job's own execution or through a context error
triggered by servePauseAndCancelRequests). As it transitions through the
state machine it updates job state accordingly and potentially calls
resumer.OnFailOrCancel for job-specific handling. Eventually the original
stepThroughStateMachine exits and the runJob thread can complete.
Jobs on their own are triggered by specific user commands such as BACKUP or
CREATE CHANGEFEED, however cockroachdb also supports scheduling jobs to run in
the future and be able to recur based on a crontab schedule. As of writing this
is only used for backups with CREATE SCHEDULE FOR BACKUP.
Similar to Jobs, Schedules can be PAUSEd, RESUMEd, or DROPped via {cmd} SCHEDULES {select_clause} and are handled by
controlSchedulesNode.startExec.
PAUSE and RESUME simply set the next_run of the schedule to either
an empty value or the next iteration according to the schedule_expr, while
CANCEL deletes the record from the table.
Job Schedules are stored in the
system.scheduled_jobs
table. A more user-readable version can be viewed using the SHOW SCHEDULES SQL
statement.
CREATE TABLE system.scheduled_jobs (
schedule_id INT DEFAULT unique_rowid() NOT NULL,
schedule_name STRING NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT now(),
owner STRING NOT NULL,
next_run TIMESTAMPTZ, -- the next scheduled run of the job in UTC, NULL if paused
schedule_state BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleState', schedule_state)
schedule_expr STRING, -- job schedule in crontab format, if empty the schedule will not recur
schedule_details BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleDetails', schedule_details)
executor_type STRING NOT NULL,
execution_args BYTES NOT NULL,
...
)
State on the schedule's behavior and current status are stored in the
schedule_details and schedule_state columns respectively which follow the
following protobuf formats:
// How to schedule and execute the job.
message ScheduleDetails {
// How to handle encountering a previously started job that hasn't completed yet
enum WaitBehavior {
WAIT = 0; // Wait for previous run to complete then run
NO_WAIT = 1; // Do not wait and just run a potentially overlapping execution
SKIP = 2; // If the previous run is ongoing, just advance the schedule without running
}
// How to proceed if the scheduled job fails
enum ErrorHandlingBehavior {
RETRY_SCHED = 0; // Allow the job to execute again next time its scheduled to do so
RETRY_SOON = 1; // Retry the job almost immediately after failure
PAUSE_SCHED = 2; // Pause the schedule entirely
}
WaitBehavior wait = 1;
ErrorHandlingBehavior on_error = 2;
}
// Mutable state for the schedule such as error strings
message ScheduleState {
string status = 1;
}
A CRDB node writes to a row in the scheduled jobs table through the ScheduledJob
struct by queuing up individual changes in its dirty property and then either creating
or updating
a row in the table to commit them. Next, we'll discuss when a CRDB node would write to this table.
A ScheduledJob struct is first created via
NewScheduledJob
(ex: in
makeBackupSchedule),
initializing the struct which can then have its properties be set using the
setter functions. Finally, the data in the struct is persisted into the
scheduled_jobs table via
ScheduledJob.Create.
When a SQLServer initializes, it calls
StartJobSchedulerDaemon
to start the
job-scheduler
async task, similar to other job background tasks. Unless
jobs.scheduler.enabled
is false, on an interval of
jobs.scheduler.pace
(default 1 minute), the node will attempt to
process
up to
jobs.scheduler.max_jobs_per_iteration
schedules (default 10) with a next_run timestamp earlier than the current time.
If no jobs have successfully started for this current execution of the schedule,
a type-specific executor will be called to queue the appropriate jobs to be
executed (ex:
executeBackup).
Once jobs created by this schedule are observed, the next_run of the schedule
will be advanced by
processSchedule
according to the schedule_expr with some added jitter to avoid conflicting
transactions.
The executor for a given type of Job Schedule is registered via the
RegisterScheduledJobExecutorFactory
function (ex: registering
ScheduledBackupExecutor).
The factory must return a struct that implements the
ScheduledJobsExecutor
interface.
type ScheduledJobExecutor interface {
ExecuteJob(...) error
NotifyJobTermination(...) error
Metrics() metric.Struct
GetCreateScheduleStatement(...) (string, error)
}
NB: Jobs created by a schedule will have their created_by_type and created_by_id
columns set to that of the schedule that created them.
In this section, we detail the code paths taken by CRDB SQL statements that spawn jobs, such as BACKUP, CHANGEFEED and Schema Changes (e.g. ALTER).
SQL Statments that rely on the job system begin to branch from the conventional
life of a query during Logical Planning and Optimization
(read through this section first!). Specifically, after following a few internal calls from
makeExecPlan, we call trybuildOpaque
to convert an AST into a memo.
tryBuildOpaque will match the statement to one
of the opaqueStatements, a map populated by Opaque.go’s init()
, and then, call
buildOpaque
mysteriously here.
Each kind of job will take a different path through buildOpaque, which we’ll discuss in more
detail later.Further, unlike the normal logical planning path for SQL queries,
tryBuildOpaque skips
Cockroach’s optimization engine, and returns a populated memo object. Why skip query
optimization?? Cockroach SQL statements that spawn jobs don’t contain any
subqueries or additional operators that would benefit from query optimization
(e.g. JOIN, SELECT, WHERE).
Finally, just like any regular query, the job’s memo object is then
converted
(specifically
here)
into a query plan, and
execWithDistSQLEngine
starts the job, specifically via spawning a new goroutine
here
(not sure if all jobs go through hookfn node). The exact function run by the
go routine varies by job. Note, unlike many other query
plans
, DistSQLEngine will not create a distributed physical plan for the job--which
you can verify by checking the
distributePlan
variable-- rather, each specific job will manually distribute work across nodes
during execution. Next we'll discuss how certain CRDB features spawn jobs in the DistSQL Engine.
A subset of CRDB features -- BACKUP, RESTORE, IMPORT, CHANGEFEED -- are licensed under the Cockroach Community License (CCL) and plan jobs in a similar fashion.
During logical planning, each CCL job executes
their own specific planhookFn
that most importantly returns a PlanHookRowFn, a function which in turn gets
called through the execWithDistSQLEngine
stack here.
The PlanHookRowFn is responsible for planning the job and writing a job record to
the jobs table (i.e. creating the job).
There are some common themes to note in a planHookRowFn.
We’ll hyperlink to backup_planning.go to illustrate.
p.ExtendedEvalContext().Txn
a transaction handler scoped for the entire execution of the SQL statement, accessed via the
planHookState
interface. Most notably, we write
the job record
to the jobs table using this txn. When we commit this transaction, all operations will commit;
else, all operations rollback. To read something quickly and transactionally during planning,
planHookRowFn often invokes a new transaction (eg 1
, 2)
using p.ExecutorConfig.DB. If you’re unfamiliar with transactions in CRDB, read the
KV Client Interface
section of Life of a Query.planHookRowFn!
Indeed, the SQL shell will not return until after the job completes!
detached parameter
instead, the planHookRowFn returns right after writing the job record to the jobs table.
By contrast, p.ExtendedEvalContext().Txn uses an explicit transaction and
commits
in connExecutor land.TODO (hopefully by someone on SQL Schema :))))
Next, we describe how various kinds of jobs work, i.e. when a node picks up a Backup or Changefeed job, what does it actually do? This will be more high level, and will point towards RFCs/Docs/Blogposts for further details. TODO(Shiranka): CDC TODO(???): Schema Changes
TODO(MB)
Before reading through this code walk through of restore, watch the MOLTing tutorial on restore. This walkthrough doesn't consider the distinct code paths each type of RESTORE may take.
In addition to general CCL job planning, planning a restore has the following main components.
When a gateway node resumes a restore job, the following occurs before any processors spin up. For more on processors, check out the related section in Life of a Query.
We're now ready to begin loading the backup data into our cockroach cluster.
Important note: the last range in the restoring cluster's key space is one big empty range, with a key span starting above the highest key with data in the cluster up to the max key allowed in the cluster. We want to restore to that empty range. Recall that a key span is merely an interval of keys while a range has deeply physical representation: it represents stored data, in a given key span, that is replicated across nodes.
Our first task is to split this massive empty range up into smaller ranges that we will restore data into, and randomly assign nodes to be leaseholders for these new ranges. More concretely, the restore job's gateway node will iterate through the list of key spans we seek to restore, and round robin assign them to nodes in the cluster which will then each start up a split and scatter processor.
Each split and scatter processor will then do the following, for each key span it processes:
In addition to running a split and scatter processor, each node will run a restore data processor. For each empty range the restore data processor receives, it will read the relevant Backup SSTables in external storage, remap each key to the restore's key space, and flush SSTable(s) to disk, using the kv client interface's AddSStable method, which bypasses much of the infrastructure related to writing data to disk from conventional queries. Note: all the kv shenanigans (e.g. range data replication, range splitting/merging, leaseholder reassignment) is abstracted away from the bulk codebase, though these things can happen while the restore is in process!
TODO: Talk about how Backup uses two schedules that depend on each other and must be cleaned up together