Back to Cockroach

The Job System

docs/tech-notes/jobs.md

26.1.336.9 KB
Original Source

The Job System

Original authors: Michael Butler & Shiranka Miskin (November 2021)

What is a Job?

Jobs are CockroachDB's way of representing long running background tasks. They are used internally as a core part of various features of CockroachDB such as Changefeeds, Backups, and Schema Changes. Progress is regularly persisted such that a node may fail / the job can be paused and a node will still be able to resume the work later on.

User Control

Users can send PAUSE/RESUME/CANCEL commands to specific jobs via SQL commands such as PAUSE JOB {job_id} for a single job, to an arbitrary set of jobs with commands such as CANCEL JOBS {select_clause}, to specific job types through commands such as RESUME ALL CHANGEFEED JOBS, or to jobs triggered by a given schedule through commands such as PAUSE JOBS FOR SCHEDULE {schedule_id}.

Commands to specific job ids are handled through controlJobsNode.startExec. PAUSE and CANCEL commands move the job to a pause-requested and cancel-requested state respectively, which allows the node currently running the job to proceed with actually pausing or cancelling it (see jobs/cancel in Job Management). RESUME moves the job to a running state to be picked up by any node (see jobs/adopt in Job Management).

The batch commands based on a schedule / type are simply delegated to the respective {cmd} JOBS command via delegateJobControl.

Internal Representation

A Job is represented as a row in the system.jobs table which is the single source of truth for individual job information. Nodes read this table to determine the jobs they can execute, using the payload to determine how to execute them and the progress to pick up from where other nodes may have left it. Completed/cancelled jobs are eventually garbage collected from system.jobs.

sql
CREATE TABLE system.jobs (
  id                INT8      DEFAULT unique_rowid(),
  created           TIMESTAMP NOT NULL DEFAULT now(),

  -- States such as "pending", "running", "paused", "pause-requested", etc.
  status            STRING    NOT NULL,

  -- Information specific to each type of job (ex: Changefeed, Backup).
  payload           BYTES     NOT NULL,   -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payload)
  progress          BYTES,                -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress)

  -- Used to track which node currently owns execution of the job
  claim_session_id  BYTES,
  claim_instance_id INT8,

  -- The system that created the job and the id relevant to that system (ex: crdb_schedule and the schedule_id)
  created_by_type   STRING,
  created_by_id     INT,

  -- Useful for observability
  num_runs          INT8,
  last_run          TIMESTAMP,

  ... -- constraint/index/family
)

Jobs are primarily specified and differentiated through their payload column which are bytes of type jobspb.Payload, storing information related to the type, parameters, and metadata of the job.

protobuf
message Payload {
  // General information relevant to any job
  string description = 1;
  repeated string statement = 16;
  int64 started_micros = 3;
  int64 finished_micros = 4;
  ...
  repeated errorspb.EncodedError resume_errors = 17;
  repeated errorspb.EncodedError cleanup_errors = 18;
  errorspb.EncodedError final_resume_error = 19;
  ...
  // Type-specific payload details that mark the type of job and stores type-specific information
  oneof details {
    BackupDetails backup = 10;
    RestoreDetails restore = 11;
    ... // Details for the many other types of jobs
  }
  ...
}

The progress of the specified job through its lifecycle is tracked in a separate progress column of type jobspb.Progress.

protobuf
message Progress {
  // Generic information for observability
  oneof progress {
    float fraction_completed = 1;
    util.hlc.Timestamp high_water = 3;
  }
  string running_status = 4;

  // Type-specific progress information similar to Payload details
  oneof details { // Note that while this is also called details, it does not take {JobType}Details structs like Payload
    BackupProgress backup = 10;
    RestoreProgress restore = 11;
    ... // Progress for the many other types of jobs
  }
  ...
}

A more user-readable version of system.jobs with the parsed payload and progress information can be monitored through the crdb_internal.jobs table (which simply reads from system.jobs). The SHOW JOBS command operates by reading from crdb_internal.jobs.

A Job is created when a new row is inserted to system.jobs. Once written, it is able to be "claimed" by any node in the cluster (asserting that it is responsible for executing the job) through the setting of claim_session_id and claim_instance_id in the job record. Job execution ends when the status changes from running to success, cancelled, or failure. The job record eventually gets deleted from the system table -- by default, 14 hours after execution ends.

If that node fails to completely execute the job (either the node failing or the job being paused), once the job record is in the table it is also able to be resumed by any node in the cluster. The mechanism by which this is done is the JobRegistry.

The Job Registry

A node interacts with the jobs table through the JobRegistry struct.

Job Creation

A node creates a job by calling the Registry's CreateJobWithTxn or CreateAdoptableJobWithTxn which INSERTs a new row into system.jobs . The node passes the job specific information to these functions via the Record struct.

If the node created the job with CreateJobWithTxn, it will also claim the job by setting the claim IDs and start the job. By contrast, CreateAdoptableJobWithTxn allows another node to adopt and resume the job (startableJob.Start() is never called on these jobs).

Job Management

While job creation is triggered in response to client queries such as BACKUP, job adoption, cancellation, and deletion is managed through daemon goroutines. These daemon goroutines will continually run on each node, allowing any node to participate in picking up work, unassigning jobs that were on nodes that failed, and cleaning up old job records from the table.

These goroutines begin when each node's SQL Server starts, initializing the JobRegistry via jobRegistry.Start(...). Specifically, jobRegistry.Start() kicks off the following goroutines:

Job Execution

Each job type registers its respective execution logic through jobs.RegisterConstructor which globally registers a Constructor function for a given jobspb.Type. This Constructor returns an implementation of jobs.Resumer with the Resume and OnFailOrCancel functions for the registry to execute.

When a job is adopted and resumed by a node's registry, unless the job is not already running or completed, runJob is ran in its own goroutine as an async task. During adoption a new context is created for the runJob execution with its own cancel function that is stored in the registry's internal map of adopted jobs. This cancel callback stored in the registry allows it to remotely terminate the job when it must servePauseAndCancelRequests.

This thread handles executing the job by modeling it as a state machine through the registry's stepThroughStateMachine, starting from the StatusRunning state. It calls resumer.Resume to execute the job-specific logic, and once the function completes it recursively calls stepThroughStateMachine to transition to the appropriate next state depending on if the resumption completed successfully or due to some form of error (either through the job's own execution or through a context error triggered by servePauseAndCancelRequests). As it transitions through the state machine it updates job state accordingly and potentially calls resumer.OnFailOrCancel for job-specific handling. Eventually the original stepThroughStateMachine exits and the runJob thread can complete.

Scheduled Jobs

Jobs on their own are triggered by specific user commands such as BACKUP or CREATE CHANGEFEED, however cockroachdb also supports scheduling jobs to run in the future and be able to recur based on a crontab schedule. As of writing this is only used for backups with CREATE SCHEDULE FOR BACKUP.

Similar to Jobs, Schedules can be PAUSEd, RESUMEd, or DROPped via {cmd} SCHEDULES {select_clause} and are handled by controlSchedulesNode.startExec. PAUSE and RESUME simply set the next_run of the schedule to either an empty value or the next iteration according to the schedule_expr, while CANCEL deletes the record from the table.

Job Schedules are stored in the system.scheduled_jobs table. A more user-readable version can be viewed using the SHOW SCHEDULES SQL statement.

sql
CREATE TABLE system.scheduled_jobs (
    schedule_id      INT DEFAULT unique_rowid() NOT NULL,
    schedule_name    STRING NOT NULL,
    created          TIMESTAMPTZ NOT NULL DEFAULT now(),
    owner            STRING NOT NULL,
    next_run         TIMESTAMPTZ, -- the next scheduled run of the job in UTC, NULL if paused
    schedule_state   BYTES,       -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleState', schedule_state) 
    schedule_expr    STRING,      -- job schedule in crontab format, if empty the schedule will not recur
    schedule_details BYTES,       -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleDetails', schedule_details)
    executor_type    STRING NOT NULL,
    execution_args   BYTES NOT NULL,
    ...
)

State on the schedule's behavior and current status are stored in the schedule_details and schedule_state columns respectively which follow the following protobuf formats:

protobuf
// How to schedule and execute the job.
message ScheduleDetails {
  // How to handle encountering a previously started job that hasn't completed yet
  enum WaitBehavior {
    WAIT = 0; // Wait for previous run to complete then run
    NO_WAIT = 1; // Do not wait and just run a potentially overlapping execution
    SKIP = 2; // If the previous run is ongoing, just advance the schedule without running
  }

  // How to proceed if the scheduled job fails
  enum ErrorHandlingBehavior {
    RETRY_SCHED = 0; // Allow the job to execute again next time its scheduled to do so
    RETRY_SOON = 1; // Retry the job almost immediately after failure
    PAUSE_SCHED = 2; // Pause the schedule entirely
  }

  WaitBehavior wait = 1;
  ErrorHandlingBehavior on_error = 2;
}

// Mutable state for the schedule such as error strings
message ScheduleState {
  string status = 1;
}

A CRDB node writes to a row in the scheduled jobs table through the ScheduledJob struct by queuing up individual changes in its dirty property and then either creating or updating a row in the table to commit them. Next, we'll discuss when a CRDB node would write to this table.

Schedule Creation

A ScheduledJob struct is first created via NewScheduledJob (ex: in makeBackupSchedule), initializing the struct which can then have its properties be set using the setter functions. Finally, the data in the struct is persisted into the scheduled_jobs table via ScheduledJob.Create.

Schedule Management

When a SQLServer initializes, it calls StartJobSchedulerDaemon to start the job-scheduler async task, similar to other job background tasks. Unless jobs.scheduler.enabled is false, on an interval of jobs.scheduler.pace (default 1 minute), the node will attempt to process up to jobs.scheduler.max_jobs_per_iteration schedules (default 10) with a next_run timestamp earlier than the current time.

If no jobs have successfully started for this current execution of the schedule, a type-specific executor will be called to queue the appropriate jobs to be executed (ex: executeBackup).

Once jobs created by this schedule are observed, the next_run of the schedule will be advanced by processSchedule according to the schedule_expr with some added jitter to avoid conflicting transactions.

The executor for a given type of Job Schedule is registered via the RegisterScheduledJobExecutorFactory function (ex: registering ScheduledBackupExecutor). The factory must return a struct that implements the ScheduledJobsExecutor interface.

go
type ScheduledJobExecutor interface {
	ExecuteJob(...) error
	NotifyJobTermination(...) error
	Metrics() metric.Struct
	GetCreateScheduleStatement(...) (string, error)
}

NB: Jobs created by a schedule will have their created_by_type and created_by_id columns set to that of the schedule that created them.

Applications of the Job System

In this section, we detail the code paths taken by CRDB SQL statements that spawn jobs, such as BACKUP, CHANGEFEED and Schema Changes (e.g. ALTER).

Job Planning

SQL Statments that rely on the job system begin to branch from the conventional life of a query during Logical Planning and Optimization (read through this section first!). Specifically, after following a few internal calls from makeExecPlan, we call trybuildOpaque to convert an AST into a memo.

  • Detailed Note: tryBuildOpaque will match the statement to one of the opaqueStatements, a map populated by Opaque.go’s init() , and then, call buildOpaque mysteriously here. Each kind of job will take a different path through buildOpaque, which we’ll discuss in more detail later.

Further, unlike the normal logical planning path for SQL queries, tryBuildOpaque skips Cockroach’s optimization engine, and returns a populated memo object. Why skip query optimization?? Cockroach SQL statements that spawn jobs don’t contain any subqueries or additional operators that would benefit from query optimization (e.g. JOIN, SELECT, WHERE).

Finally, just like any regular query, the job’s memo object is then converted (specifically here) into a query plan, and execWithDistSQLEngine starts the job, specifically via spawning a new goroutine here (not sure if all jobs go through hookfn node). The exact function run by the go routine varies by job. Note, unlike many other query plans , DistSQLEngine will not create a distributed physical plan for the job--which you can verify by checking the distributePlan variable-- rather, each specific job will manually distribute work across nodes during execution. Next we'll discuss how certain CRDB features spawn jobs in the DistSQL Engine.

CCL Job Planning

A subset of CRDB features -- BACKUP, RESTORE, IMPORT, CHANGEFEED -- are licensed under the Cockroach Community License (CCL) and plan jobs in a similar fashion.

During logical planning, each CCL job executes their own specific planhookFn that most importantly returns a PlanHookRowFn, a function which in turn gets called through the execWithDistSQLEngine stack here. The PlanHookRowFn is responsible for planning the job and writing a job record to the jobs table (i.e. creating the job).

There are some common themes to note in a planHookRowFn. We’ll hyperlink to backup_planning.go to illustrate.

  • Throughout job planning, several (but not all) read and write requests get sent to the kv layer using p.ExtendedEvalContext().Txn a transaction handler scoped for the entire execution of the SQL statement, accessed via the planHookState interface. Most notably, we write the job record to the jobs table using this txn. When we commit this transaction, all operations will commit; else, all operations rollback. To read something quickly and transactionally during planning, planHookRowFn often invokes a new transaction (eg 1 , 2) using p.ExecutorConfig.DB. If you’re unfamiliar with transactions in CRDB, read the KV Client Interface section of Life of a Query.
  • By default, bulk jobs (IMPORT, BACKUP, and RESTORE) commit the transaction, start and wait for the job to complete, all within the planHookRowFn! Indeed, the SQL shell will not return until after the job completes!
    • Advanced Note: These jobs use an implicit transaction to control when the transaction commits / rolls back outside the Txn API. When bulk jobs run with the detached parameter instead, the planHookRowFn returns right after writing the job record to the jobs table. By contrast, p.ExtendedEvalContext().Txn uses an explicit transaction and commits in connExecutor land.

Schema Changes Job Planning

TODO (hopefully by someone on SQL Schema :))))

Job Execution

Next, we describe how various kinds of jobs work, i.e. when a node picks up a Backup or Changefeed job, what does it actually do? This will be more high level, and will point towards RFCs/Docs/Blogposts for further details. TODO(Shiranka): CDC TODO(???): Schema Changes

Backup

TODO(MB)

Restore

Before reading through this code walk through of restore, watch the MOLTing tutorial on restore. This walkthrough doesn't consider the distinct code paths each type of RESTORE may take.

Planning

In addition to general CCL job planning, planning a restore has the following main components.

  • Resolve the location of the backup files we seek to restore.
  • Figure out which descriptors the user wants to restore. Descriptors are objects that hold metadata about various SQL objects, like columns or databases.
  • Allocate new descriptor IDs for the descriptors we're restoring from the backup files. Why do this? Every descriptor on disk has a unique id, so RESTORE must resolve ID collisions between the stale ID's in the back up and any IDs in the target cluster.

Execution

When a gateway node resumes a restore job, the following occurs before any processors spin up. For more on processors, check out the related section in Life of a Query.

  • Write the new descriptors to disk in an offline state so no users can interact with the descriptors during the restore.
  • Derive a list of key spans we need to restore from the restoring descriptors. A key span is just an interval in the backup's key space.

We're now ready to begin loading the backup data into our cockroach cluster.

Important note: the last range in the restoring cluster's key space is one big empty range, with a key span starting above the highest key with data in the cluster up to the max key allowed in the cluster. We want to restore to that empty range. Recall that a key span is merely an interval of keys while a range has deeply physical representation: it represents stored data, in a given key span, that is replicated across nodes.

Our first task is to split this massive empty range up into smaller ranges that we will restore data into, and randomly assign nodes to be leaseholders for these new ranges. More concretely, the restore job's gateway node will iterate through the list of key spans we seek to restore, and round robin assign them to nodes in the cluster which will then each start up a split and scatter processor.

Each split and scatter processor will then do the following, for each key span it processes:

  • Issue a split key request to the kv layer at the beginning key of the next span it will process, which splits that big empty range at that given key, creating a new range to import data into. I recommend reading the code and comments here because the indexing is a little confusing.
    • Note: before the split request, we remap this key (currently in the backup's key space) so it maps nicely to the restore cluster's key space. E.g. suppose we want to restore a table with a key span in the backup from 57/1 to 57/2; but the restore cluster already has data in that span. To avoid collisions, we have to remap this key span into the key span of that empty range.
  • Issue a scatter request to the kv layer on the span's first key. This asks kv to randomly reassign the lease of this key's range. KV may not obey the request.
  • Route info to this new range's new leaseholder, so the leaseholder can restore data into that range.

In addition to running a split and scatter processor, each node will run a restore data processor. For each empty range the restore data processor receives, it will read the relevant Backup SSTables in external storage, remap each key to the restore's key space, and flush SSTable(s) to disk, using the kv client interface's AddSStable method, which bypasses much of the infrastructure related to writing data to disk from conventional queries. Note: all the kv shenanigans (e.g. range data replication, range splitting/merging, leaseholder reassignment) is abstracted away from the bulk codebase, though these things can happen while the restore is in process!

TODO: Talk about how Backup uses two schedules that depend on each other and must be cleaned up together