Back to Payload

Jobs

docs/jobs-queue/jobs.mdx

3.84.17.6 KB
Original Source

Now that we have covered Tasks and Workflows, we can tie them together with a concept called a Job.

<Banner type="default"> Whereas you define Workflows and Tasks, which control your business logic, a **Job** is an individual instance of either a Task or a Workflow which contains many tasks. </Banner>

For example, let's say we have a Workflow or Task that describes the logic to sync information from Payload to a third-party system. This is how you'd declare how to sync that info, but it wouldn't do anything on its own. In order to run that task or workflow, you'd create a Job that references the corresponding Task or Workflow.

Jobs are stored in the Payload database in the payload-jobs collection, and you can decide to keep a running list of all jobs, or configure Payload to delete the job when it has been successfully executed.

Queuing a new job

In order to queue a job, you can use the payload.jobs.queue function.

Here's how you'd queue a new Job, which will run a createPostAndUpdate workflow:

ts
const createdJob = await payload.jobs.queue({
  // Pass the name of the workflow
  workflow: 'createPostAndUpdate',
  // The input type will be automatically typed
  // according to the input you've defined for this workflow
  input: {
    title: 'my title',
  },
})

In addition to being able to queue new Jobs based on Workflows, you can also queue a job for a single Task:

ts
const createdJob = await payload.jobs.queue({
  task: 'createPost',
  input: {
    title: 'my title',
  },
})

Where to Queue Jobs

Jobs can be queued from anywhere in your application. Here are the most common scenarios:

From Collection Hooks

The most common place - queue jobs in response to document changes:

ts
{
  slug: 'posts',
  hooks: {
    afterChange: [
      async ({ req, doc, operation }) => {
        // Only send notification for published posts
        if (operation === 'update' && doc.status === 'published') {
          await req.payload.jobs.queue({
            task: 'notifySubscribers',
            input: {
              postId: doc.id,
            },
          })
        }
      },
    ],
  },
}

From Field Hooks

Queue jobs based on specific field changes:

ts
{
  name: 'featuredImage',
  type: 'upload',
  relationTo: 'media',
  hooks: {
    afterChange: [
      async ({ req, value, previousValue }) => {
        // Generate image variants when image changes
        if (value !== previousValue) {
          await req.payload.jobs.queue({
            task: 'generateImageVariants',
            input: {
              imageId: value,
            },
          })
        }
      },
    ],
  },
}

From Custom Endpoints

Queue jobs from your API routes:

ts
export const POST = async (req: PayloadRequest) => {
  const job = await req.payload.jobs.queue({
    workflow: 'generateMonthlyReport',
    input: {
      month: new Date().getMonth(),
      year: new Date().getFullYear(),
    },
  })

  return Response.json({
    message: 'Report generation queued',
    jobId: job.id,
  })
}

From Server Actions

Queue jobs from Next.js server actions:

ts
'use server'

import { getPayload } from 'payload'
import config from '@payload-config'

export async function scheduleEmail(userId: string) {
  const payload = await getPayload({ config })

  await payload.jobs.queue({
    task: 'sendEmail',
    input: { userId },
  })
}

Job Options

When queuing a job, you can pass additional options:

ts
await payload.jobs.queue({
  task: 'sendEmail',
  input: { userId: '123' },

  // Schedule the job to run in the future
  waitUntil: new Date('2024-12-25T00:00:00Z'),

  // Assign to a specific queue
  queue: 'high-priority',

  // Add custom metadata for tracking
  log: [
    {
      message: 'Email queued by admin',
      createdAt: new Date().toISOString(),
    },
  ],
})

Common options

  • waitUntil - Schedule the job to run at a specific date/time in the future
  • queue - Assign the job to a specific queue (defaults to 'default')
  • log - Add custom log entries for debugging or tracking
  • req - Pass the request context for access control

Check Job Status

After queuing a job, you can check its status:

ts
const job = await payload.jobs.queue({
  task: 'processPayment',
  input: { orderId: '123' },
})

// Later, check the job status
const updatedJob = await payload.findByID({
  collection: 'payload-jobs',
  id: job.id,
})

console.log(updatedJob.completedAt) // When it finished
console.log(updatedJob.hasError) // If it failed
console.log(updatedJob.taskStatus) // Details of each task

Job Status Fields

Each job document contains:

ts
{
  id: 'job_123',
  taskSlug: 'sendEmail',        // Or workflowSlug for workflows
  input: { userId: '123' },     // The input you provided
  completedAt: '2024-01-15...',  // When job completed (null if pending)
  hasError: false,              // True if job failed
  totalTried: 1,                // Number of attempts
  processing: false,            // True if currently running
  taskStatus: {                 // Status of each task (for workflows)
    sendEmail: {
      '1': {
        complete: true,
        output: { emailSent: true }
      }
    }
  },
  log: [                        // Execution log
    {
      message: 'Job started',
      createdAt: '...'
    }
  ]
}

Access Control

By default, Payload's job operations bypass access control when used from the Local API. You can enable access control by passing overrideAccess: false to any job operation.

To define custom access control for jobs, add an access property to your Jobs Config:

ts
import type { SanitizedConfig } from 'payload'

const config: SanitizedConfig = {
  // ...
  jobs: {
    access: {
      // Control who can queue new jobs
      queue: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
      // Control who can run jobs
      run: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
      // Control who can cancel jobs
      cancel: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
    },
  },
}

Each access control function receives the current req object and should return a boolean. If no access control is defined, the default behavior allows any authenticated user to perform the operation.

To use access control in the Local API:

ts
const req = await createLocalReq({ user }, payload)

await payload.jobs.queue({
  workflow: 'createPost',
  input: { title: 'My Post' },
  overrideAccess: false, // Enable access control
  req, // Pass the request with user context
})
<Banner type="warning"> It is not recommended to modify the `payload-jobs` collection's access control directly, as that pattern may be deprecated in future versions. Instead—use the `access` property in your Jobs Config to control job operations. </Banner>

Cancelling Jobs

Payload allows you to cancel jobs that are either queued or currently running. When cancelling a running job, the current task will finish executing, but no subsequent tasks will run. This happens because the job checks its cancellation status between tasks.

To cancel a specific job, use the payload.jobs.cancelByID method with the job's ID:

ts
await payload.jobs.cancelByID({
  id: createdJob.id,
})

To cancel multiple jobs at once, use the payload.jobs.cancel method with a Where query:

ts
await payload.jobs.cancel({
  where: {
    workflowSlug: {
      equals: 'createPost',
    },
  },
})

From within a task or workflow handler, you can also cancel the current job by throwing a JobCancelledError:

ts
throw new JobCancelledError('Job was cancelled')