src/jobservice/README.md
Job service is designed to handle and process the asynchronous background jobs with an easy way. It is built on top of gocraft/work job queue framework with supporting
and the following additional capabilities:
error,success,stopped,cancelled and scheduled.stop and cancel.With job service, you can:
Generic job which will be executed immediately if worker resource is available and can be only execute once.Scheduled job which will be executed after a specified delay.Periodic job which will be repeatedly executed with specified interval.unique flag to make sure no duplicated jobs are executing at the same time.The overall architecture of the job service is shown in the below graph:
Components:
Periodic ones.Periodic jobs.redis.Currently, the worker (compute node) and controller (control plane) are packaged in one process. To achieve scalability and HA functionality, multiple nodes can be deployed under a LB layer.
As described in above graph, the controller and work pool which are located in different nodes can also talk to each other via a virtual channel - the backend persistent driver. That means the job enqueued by a controller may be selected by other worker pool which is located in another node.
To let the job service recognize the job, the implementation of job should follow the programming model.
A valid job must implement the job interface. For the details of each method defined in the job interface, you can refer the comments attached with the method.
// Interface defines the related injection and run entry methods.
type Interface interface {
// Declare how many times the job can be retried if failed.
//
// Return:
// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
MaxFails() uint
// Max currency of the job. Unlike the WorkerPool concurrency, it controls the limit on the number jobs of that type
// that can be active at one time by within a single redis instance.
//
// The default value is 0, which means "no limit on job concurrency".
MaxCurrency() uint
// Tell the worker worker if retry the failed job when the fails is
// still less that the number declared by the method 'MaxFails'.
//
// Returns:
// true for retry and false for none-retry
ShouldRetry() bool
// Indicate whether the parameters of job are valid.
//
// Return:
// error if parameters are not valid. NOTES: If no parameters needed, directly return nil.
Validate(params Parameters) error
// Run the business logic here.
// The related arguments will be injected by the workerpool.
//
// ctx Context : Job execution context.
// params map[string]any : parameters with key-pair style for the job execution.
//
// Returns:
// error if failed to run. NOTES: If job is stopped or cancelled, a specified error should be returned
//
Run(ctx Context, params Parameters) error
}
Just pay attention, your main logic should be written in the Run method.
A job context will be provided when executed the Run logic. With this context, you can
stop and cancel.checkin func to check in message.To make the job cancellable, some special logic should be coded in the Run logic.
First, check the signal at certain execution points,
if cmd, ok := ctx.OPCommand(); ok {}
Then, check if it is a cancel signal,
if cmd == opm.CtlCommandCancel {}
finally, if it is, exit the logic and return the cancel error.
return errs.JobCancelledError()
To make the job stoppable, some special logic should be coded in the Run logic.
First, check the signal at certain execution points,
if cmd, ok := ctx.OPCommand(); ok {}
Then, check if it is a stop signal,
if cmd == opm.CtlCommandStop {}
finally, if it is, exit the logic and return the cancel error.
return errs.JobStoppedError()
If you want to report more concrete status info, just call the Checkin function in the job context like the below code piece shown:
ctx.Checkin("30%")
Here is a demo job:
// DemoJob is the job to demonstrate the job interface.
type DemoJob struct{}
// MaxFails is implementation of same method in Interface.
func (dj *DemoJob) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (dj *DemoJob) MaxCurrency() uint {
return 1
}
// ShouldRetry ...
func (dj *DemoJob) ShouldRetry() bool {
return true
}
// Validate is implementation of same method in Interface.
func (dj *DemoJob) Validate(params job.Parameters) error {
if len(params) == 0 {
return errors.New("parameters required for replication job")
}
name, ok := params["image"]
if !ok {
return errors.New("missing parameter 'image'")
}
if !strings.HasPrefix(name.(string), "demo") {
return fmt.Errorf("expected '%s' but got '%s'", "demo *", name)
}
return nil
}
// Run the replication logic here.
func (dj *DemoJob) Run(ctx job.Context, params job.Parameters) error {
logger := ctx.GetLogger()
defer func() {
logger.Info("I'm finished, exit!")
fmt.Println("I'm finished, exit!")
}()
fmt.Println("I'm running")
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx)
if v, ok := ctx.Get("email_from"); ok {
fmt.Printf("Get prop form context: email_from=%s\n", v)
}
if u, err := dao.GetUser(models.User{}); err == nil {
fmt.Printf("u=%#+v\n", u)
}
/*if 1 != 0 {
return errors.New("I suicide")
}*/
// runtime error
// var runtime_err error = nil
// fmt.Println(runtime_err.Error())
logger.Info("check in 30%")
ctx.Checkin("30%")
time.Sleep(2 * time.Second)
logger.Warning("check in 60%")
ctx.Checkin("60%")
time.Sleep(2 * time.Second)
logger.Debug("check in 100%")
ctx.Checkin("100%")
time.Sleep(1 * time.Second)
// HOLD ON FOR A WHILE
logger.Error("Holding for 20 sec")
<-time.After(15 * time.Second)
// logger.Fatal("I'm back, check if I'm stopped/cancelled")
if cmd, ok := ctx.OPCommand(); ok {
logger.Infof("cmd=%s\n", cmd)
fmt.Printf("Receive OP command: %s\n", cmd)
if cmd == opm.CtlCommandCancel {
logger.Info("exit for receiving cancel signal")
return errs.JobCancelledError()
}
logger.Info("exit for receiving stop signal")
return errs.JobStoppedError()
}
fmt.Println("I'm close to end")
return nil
}
Job execution is used to track the jobs which are related to a specified job, like parent and children jobs. If one job has executions, the following two extra properties will be appended to the job stats.
{
"job": {
"executions": ["uuid-sub-job"],
"multiple_executions": true
}
}
For the job execution/sub job, there will be an extra property upstream_job_id pointing to id of the upstream (/parent) job.
{
"job": {
"upstream_job_id": "parent-id"
}
}
Under that situation, the flag multiple_executions will be set to be true. The list executions will contain all the ids of the executions (/sub jobs).
Any jobs can launch new jobs through the launch function in the job context. All those jobs will be tracked as sub jobs (executions) of the caller job.
func (j *Job) Run(ctx job.Context, params job.Parameters) error{
// ...
subJob, err := ctx.LaunchJob(models.JobRequest{})
// ...
return nil
}
The job launched with Periodic kind is actually a scheduled job template which will be not run directly. The real running job will be created by cloning the configurations from the job template and run. And then each periodic job will have multiple job executions with independent id and each job execution will link to the Periodic job by the upstream_job_id.
There are two loggers here. One is for job service itself and another one is for the running jobs. Each logger can configure multi logger backends.
Each backend logger is identified by an unique name which will be used in the logger configurations to enable the corresponding loggers. Meanwhile, each backend logger MUST implement the logger.Interface. A logger can also support (optional):
sweeper.Interfacegetter.InterfaceAll the backend loggers SHOULD onboard via the static logger registry.
// knownLoggers is a static logger registry.
// All the implemented loggers (w/ sweeper) should be registered
// with an unique name in this registry. Then they can be used to
// log info.
var knownLoggers = map[string]*Declaration{
// File logger
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
// STD output(both stdout and stderr) logger
LoggerNameStdOutput: {StdFactory, nil, nil, true},
}
So far, only the following two backends are supported:
job_logLogger configuration options:
| Option | Description |
|---|---|
| loggers[x].name | The unique name of the logger backend |
| loggers[x].level | The logger level of the logger backend |
| loggers[x].settings | A hash map to pass extra settings of the logger backend. Depends on the implementation of the backend. |
| loggers[x].sweeper.duration | The duration of the sweeper looping |
| loggers[x].sweeper.settings | A hash map to pass extra settings of the sweeper. Depends on the implementation of sweeper. |
An example:
#Loggers
loggers:
- name: "STD_OUTPUT" # logger backend name, only support "DB", "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
- name: "DB"
level: "DEBUG"
sweeper:
duration: 1 #days
The following configuration options are supported:
| Option | Description | ENV variable |
|---|---|---|
| protocol | Protocol used to serve http | JOB_SERVICE_PROTOCOL |
| https_config.cert | The tls cert if enabled https protocol | JOB_SERVICE_HTTPS_CERT |
| https_config.key | The tls key if enabled https protocol | JOB_SERVICE_HTTPS_KEY |
| port | API server listening port | JOB_SERVICE_PORT |
| worker_pool.worker_pool | The worker concurrency number | JOB_SERVICE_POOL_WORKERS |
| worker_pool.backend | The job data persistent backend driver. So far, only redis supported | JOB_SERVICE_POOL_BACKEND |
| worker_pool.redis_pool.redis_url | The redis url if backend is redis | JOB_SERVICE_POOL_REDIS_URL |
| worker_pool.redis_pool.namespace | The namespace used in redis | JOB_SERVICE_POOL_REDIS_NAMESPACE |
| loggers | Loggers for job service itself. Refer to Configure loggers | |
| job_loggers | Loggers for the running jobs. Refer to Configure loggers | |
| core_server | The harbor core server endpoint which used to retrieve Harbor configures | CORE_URL |
---
#Protocol used to serve
protocol: "https"
#Config certification if use 'https' protocol
https_config:
cert: "server.crt"
key: "server.key"
#Server listening port
port: 9443
#Worker pool
worker_pool:
#Worker concurrency
workers: 10
backend: "redis"
#Additional config if use 'redis' backend
redis_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index
#or ipaddress:port[,weight,password,database_index]
redis_url: "localhost:6379"
namespace: "harbor_job_service"
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "DB", "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
- name: "DB"
level: "DEBUG"
sweeper:
duration: 1 #days
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "DEBUG"
As job service is always running in the backend environment, a simple secret auth way is choose now. To call the job service API, the Authorization header must be appended.
Authorization : Harbor-Secret <secret>
The expected secret is passed to job service by the ENV variable CORE_SECRET.
Submit jobs
{
"job": {
"name": "demo",
"parameters": {
"p1": "just a demo"
},
"status_hook": "https://my-hook.com",
"metadata": {
"kind": "Generic", // or "Scheduled" or "Periodic"
"schedule_delay": 90, // seconds, only required when kind is "Scheduled"
"cron_spec": "* 5 * * * *", // only required when kind is "Periodic"
"unique": false
}
}
}
Response
{
"job": {
"id": "uuid-job",
"status": "pending",
"name": "DEMO",
"kind": "Generic",
"unique": false,
"ref_link": "/api/v1/jobs/uuid-job",
"enqueue_time": "2018-10-10 12:00:00",
"update_time": "2018-10-10 13:00:00",
"multiple_executions": false // To indicate if the job has sub executions
}
}
{
"code": 500,
"err": "short error message",
"description": "detailed error message"
}
Get job stats
Response
{
"job": {
"id": "uuid-job",
"status": "pending",
"name": "DEMO",
"kind": "Periodic",
"unique": false,
"ref_link": "/api/v1/jobs/uuid-job",
"enqueue_time": 1539164886,
"update_time": 1539164886,
"run_at": 1539164986,
"cron_spec": "* 5 * * * * ",
"check_in": "check in message", // if check in message
"check_in_at": 1539164889, // if check in message
"die_at": 0,
"hook_status": "http://status-check.com",
"executions": ["uuid-sub-job"], // the ids of sub executions of the job
"multiple_executions": true
}
}
{
"code": 500,
"err": "short error message",
"description": "detailed error message"
}
Stop/Cancel/Retry job
{
"action": "stop" //or "cancel" or "retry"
}
Response
{
"code": 500,
"err": "short error message",
"description": "detailed error message"
}
Retrieve job log
Response
Log text bytes
{
"code": 500,
"err": "short error message",
"description": "detailed error message"
}
Check job service healthy status
Response
[{
"worker_pool_id": "pool1",
"started_at": 1539164886,
"heartbeat_at": 1539164986,
"job_names": ["DEMO"],
"concurrency": 10,
"status": "healthy"
}]
{
"code": 500,
"err": "short error message",
"description": "detailed error message"
}
It's easy to run the job service.
// under jobservice folder
go build -a -o jobservice
Second, create configuration yaml file and configure the job service.
Then, export the secret via ENV variable CORE_SECRET.
Finally, start the service with the following command,
jobservice -c <config_yaml_file_path>
Enjoy it!