docs/jobs-queue/tasks.mdx
You can register Tasks on the Payload config, and then create Jobs or Workflows that use them. Think of Tasks like tidy, isolated "functions that do one specific thing".
Payload Tasks can be configured to be automatically retried if they fail, which makes them valuable for "durable" workflows like AI applications where LLMs can return non-deterministic results, and might need to be retried.
Tasks can either be defined within the jobs.tasks array in your Payload config, or they can be defined inline within a workflow.
Simply add a task to the jobs.tasks array in your Payload config. A task consists of the following fields:
| Option | Description |
|---|---|
slug | Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows. |
handler | The function that should be responsible for running the job. You can either pass a string-based path to the job function file, or the job function itself. If you are using large dependencies within your job, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. Passing a string path is an advanced feature that may require a sophisticated build pipeline in order to work. |
inputSchema | Define the input field schema - Payload will generate a type for this schema. |
interfaceName | You can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug. |
outputSchema | Define the output field schema - Payload will generate a type for this schema. |
label | Define a human-friendly label for this task. |
onFail | Function to be executed if the task fails. |
onSuccess | Function to be executed if the task succeeds. |
retries | Specify the number of times that this step should be retried if it fails. If this is undefined, the task will either inherit the retries from the workflow or have no retries. If this is 0, the task will not be retried. By default, this is undefined. |
concurrency | Control how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires jobs.enableConcurrencyControl: true to be set. See Concurrency Controls for details. |
schedule | Define one or more schedules to automatically queue this task periodically. Each schedule requires a cron expression and a queue name. See Job Schedules for complete documentation. |
The logic for the Task is defined in the handler - which can be defined as a function, or a path to a function. The handler will run once a worker picks up a Job that includes this task.
It should return an object with an output key, which should contain the output of the task as you've defined.
Example:
export default buildConfig({
// ...
jobs: {
tasks: [
{
// Configure this task to automatically retry
// up to two times
retries: 2,
// This is a unique identifier for the task
slug: 'createPost',
// These are the arguments that your Task will accept
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
// These are the properties that the function should output
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
// This is the function that is run when the task is invoked
handler: async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
},
} as TaskConfig<'createPost'>,
],
},
})
Tasks can be configured to run automatically on a schedule by adding the schedule property. This is useful for recurring operations like daily reports, periodic syncs, or scheduled cleanups.
How it works:
schedule property automatically queues jobs at specified times (no need to call payload.jobs.queue() manually)autoRun) to execute the queued jobsqueue nameExample:
export default buildConfig({
jobs: {
tasks: [
{
slug: 'dailyDigest',
// This automatically queues the task every day at 8 AM
schedule: [
{
cron: '0 8 * * *', // Every day at 8:00 AM
queue: 'daily', // Queue to add the job to
},
],
inputSchema: [
{
name: 'date',
type: 'date',
},
],
handler: async ({ req, input }) => {
// Send daily digest emails
const users = await req.payload.find({
collection: 'users',
where: { subscribed: { equals: true } },
})
for (const user of users.docs) {
await req.payload.sendEmail({
to: user.email,
subject: 'Your Daily Digest',
html: generateDigestHTML(user),
})
}
return {
output: {
emailsSent: users.docs.length,
date: input.date || new Date().toISOString(),
},
}
},
} as TaskConfig<'dailyDigest'>,
],
// Important: You also need to configure a runner to execute scheduled jobs
autoRun: [
{
cron: '* * * * *', // Check for jobs every minute
queue: 'daily', // Process jobs from 'daily' queue
limit: 10,
},
],
},
})
Key Points:
schedule property automatically calls payload.jobs.queue() for you on the specified scheduleschedule arraycron field uses standard cron syntax (minute, hour, day, month, day-of-week)schedule.queue and autoRun.queue must match for jobs to runCommon cron patterns:
// Every hour at minute 0
schedule: [{ cron: '0 * * * *', queue: 'hourly' }]
// Every day at midnight
schedule: [{ cron: '0 0 * * *', queue: 'nightly' }]
// Every Monday at 9 AM
schedule: [{ cron: '0 9 * * 1', queue: 'weekly' }]
// Every 5 minutes
schedule: [{ cron: '*/5 * * * *', queue: 'frequent' }]
// Every 3 seconds (extended cron syntax with seconds field)
schedule: [{ cron: '*/3 * * * * *', queue: 'realtime' }]
See Job Schedules for comprehensive scheduling documentation, including hooks, concurrency controls, and troubleshooting.
Creating or updating documents based on other document changes:
{
slug: 'updateRelatedPosts',
retries: 2,
inputSchema: [
{
name: 'categoryId',
type: 'relationship',
relationTo: 'categories',
required: true,
},
],
handler: async ({ input, req }) => {
const posts = await req.payload.find({
collection: 'posts',
where: {
category: {
equals: input.categoryId,
},
},
})
// Update all posts in this category
for (const post of posts.docs) {
await req.payload.update({
collection: 'posts',
id: post.id,
data: {
categoryUpdatedAt: new Date().toISOString(),
},
})
}
return {
output: {
postsUpdated: posts.docs.length,
},
}
},
}
Calling third-party services without blocking your API:
{
slug: 'syncToThirdParty',
retries: 3,
inputSchema: [
{
name: 'documentId',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
const doc = await req.payload.findByID({
collection: 'documents',
id: input.documentId,
})
// Call external API
const response = await fetch('https://api.example.com/sync', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(doc),
})
if (!response.ok) {
throw new Error(`API error: ${response.statusText}`)
}
return {
output: {
synced: true,
apiResponse: await response.json(),
},
}
},
}
Sometimes you want to fail a task based on business logic:
{
slug: 'processPayment',
retries: 1,
inputSchema: [
{
name: 'orderId',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
const order = await req.payload.findByID({
collection: 'orders',
id: input.orderId,
})
// Intentionally fail if order is already processed
if (order.status === 'paid') {
throw new Error('Order already processed')
}
// Process payment...
return {
output: {
paymentId: 'payment-123',
},
}
},
}
Tasks fail by throwing errors. When a task encounters any type of failure—whether it's an unexpected error, a validation issue, or a business logic violation—you should throw an error with a descriptive message.
handler: async ({ input, req }) => {
const order = await req.payload.findByID({
collection: 'orders',
id: input.orderId,
})
// Validation failure
if (input.amount !== order.total) {
throw new Error(
`Amount mismatch: expected ${order.total}, received ${input.amount}`,
)
}
// Business rule failure
if (order.status === 'cancelled') {
throw new Error('Cannot process payment for cancelled order')
}
// Conditional check
if (order.status === 'paid') {
throw new Error('Order already processed')
}
// Continue processing...
}
From within a task or workflow handler, you can prevent the entire job from being retried by throwing a JobCancelledError:
throw new JobCancelledError('Job was cancelled')
After a task fails, you can inspect the job to understand what went wrong:
const job = await payload.jobs.queue({
task: 'processPayment',
input: { orderId: '123', amount: 100 },
})
// Run the job
await payload.jobs.run()
// Check the job status
const completedJob = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// Check if job failed
if (completedJob.hasError) {
// Access the latest error that caused the job to fail
console.log(completedJob.error)
// This will contain the error message from the thrown error
// You can also check the job log to find specific tasks that errored
// Note: If the job was retried multiple times, there will be multiple erroring tasks in the log
const failedTasks = completedJob.log?.filter(
(entry) => entry.state === 'failed',
)
}
In addition to defining handlers as functions directly provided to your Payload config, you can also pass an absolute path to where the handler is defined. If your task has large dependencies, and you are planning on executing your jobs in a separate process that has access to the filesystem, this could be a handy way to make sure that your Payload + Next.js app remains quick to compile and has minimal dependencies.
Keep in mind that this is an advanced feature that may require a sophisticated build pipeline, especially when using it in production or within Next.js, e.g. by calling opening the /api/payload-jobs/run endpoint. You will have to transpile the handler files separately and ensure they are available in the same location when the job is run. If you're using an endpoint to execute your jobs, it's recommended to define your handlers as functions directly in your Payload Config, or use import paths handlers outside of Next.js.
In general, this is an advanced use case. Here's how this would look:
payload.config.ts:
import { fileURLToPath } from 'node:url'
import path from 'path'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export default buildConfig({
jobs: {
tasks: [
{
// ...
// The #createPostHandler is a named export within the `createPost.ts` file
handler:
path.resolve(dirname, 'src/tasks/createPost.ts') +
'#createPostHandler',
},
],
},
})
Then, the createPost file itself:
src/tasks/createPost.ts:
import type { TaskHandler } from 'payload'
export const createPostHandler: TaskHandler<'createPost'> = async ({
input,
job,
req,
}) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
}
By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed.
You can configure this behavior through the retries.shouldRestore property. This property accepts a boolean or a function.
If shouldRestore is set to true, the task will only be re-run if it previously failed. This is the default behavior.
If shouldRestore is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.
If shouldRestore is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, e.g you may want to re-run a task up to X amount of times and then restore it for consecutive runs, or only re-run a task if the input has changed.
Example:
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
retries: {
shouldRestore: false,
},
// ...
} as TaskConfig<'myTask'>,
],
},
})
Example - determine whether a task should be restored based on the input data:
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
inputSchema: [
{
name: 'someDate',
type: 'date',
required: true,
},
],
retries: {
shouldRestore: ({ input }) => {
if (new Date(input.someDate) > new Date()) {
return false
}
return true
},
},
// ...
} as TaskConfig<'myTask'>,
],
},
})
You can run sub-tasks within an existing task, by using the tasks or inlineTask arguments passed to the task handler function:
export default buildConfig({
// ...
jobs: {
// It is recommended to set `addParentToTaskLog` to `true` when using nested tasks, so that the parent task is included in the task log
// This allows for better observability and debugging of the task execution
addParentToTaskLog: true,
tasks: [
{
slug: 'parentTask',
inputSchema: [
{
name: 'text',
type: 'text',
},
],
handler: async ({ input, req, tasks, inlineTask }) => {
await inlineTask('Sub Task 1', {
task: () => {
// Do something
return {
output: {},
}
},
})
await tasks.CreateSimple('Sub Task 2', {
input: { message: 'hello' },
})
return {
output: {},
}
},
} as TaskConfig<'parentTask'>,
],
},
})