Back to Payload

Tasks

docs/jobs-queue/tasks.mdx

3.84.121.0 KB
Original Source
<Banner type="default"> A **"Task"** is a function definition that performs business logic and whose input and output are both strongly typed. </Banner>

You can register Tasks on the Payload config, and then create Jobs or Workflows that use them. Think of Tasks like tidy, isolated "functions that do one specific thing".

Payload Tasks can be configured to be automatically retried if they fail, which makes them valuable for "durable" workflows like AI applications where LLMs can return non-deterministic results, and might need to be retried.

Tasks can either be defined within the jobs.tasks array in your Payload config, or they can be defined inline within a workflow.

Defining tasks in the config

Simply add a task to the jobs.tasks array in your Payload config. A task consists of the following fields:

OptionDescription
slugDefine a slug-based name for this job. This slug needs to be unique among both tasks and workflows.
handlerThe function that should be responsible for running the job. You can either pass a string-based path to the job function file, or the job function itself. If you are using large dependencies within your job, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. Passing a string path is an advanced feature that may require a sophisticated build pipeline in order to work.
inputSchemaDefine the input field schema - Payload will generate a type for this schema.
interfaceNameYou can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug.
outputSchemaDefine the output field schema - Payload will generate a type for this schema.
labelDefine a human-friendly label for this task.
onFailFunction to be executed if the task fails.
onSuccessFunction to be executed if the task succeeds.
retriesSpecify the number of times that this step should be retried if it fails. If this is undefined, the task will either inherit the retries from the workflow or have no retries. If this is 0, the task will not be retried. By default, this is undefined.
concurrencyControl how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires jobs.enableConcurrencyControl: true to be set. See Concurrency Controls for details.
scheduleDefine one or more schedules to automatically queue this task periodically. Each schedule requires a cron expression and a queue name. See Job Schedules for complete documentation.

The logic for the Task is defined in the handler - which can be defined as a function, or a path to a function. The handler will run once a worker picks up a Job that includes this task.

It should return an object with an output key, which should contain the output of the task as you've defined.

Example:

ts
export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        // Configure this task to automatically retry
        // up to two times
        retries: 2,

        // This is a unique identifier for the task

        slug: 'createPost',

        // These are the arguments that your Task will accept
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],

        // These are the properties that the function should output
        outputSchema: [
          {
            name: 'postID',
            type: 'text',
            required: true,
          },
        ],

        // This is the function that is run when the task is invoked
        handler: async ({ input, job, req }) => {
          const newPost = await req.payload.create({
            collection: 'post',
            req,
            data: {
              title: input.title,
            },
          })
          return {
            output: {
              postID: newPost.id,
            },
          }
        },
      } as TaskConfig<'createPost'>,
    ],
  },
})

Scheduling Tasks to Run Automatically

Tasks can be configured to run automatically on a schedule by adding the schedule property. This is useful for recurring operations like daily reports, periodic syncs, or scheduled cleanups.

How it works:

  1. The schedule property automatically queues jobs at specified times (no need to call payload.jobs.queue() manually)
  2. You still need to configure a job runner (like autoRun) to execute the queued jobs
  3. Both the schedule and runner must use the same queue name

Example:

ts
export default buildConfig({
  jobs: {
    tasks: [
      {
        slug: 'dailyDigest',

        // This automatically queues the task every day at 8 AM
        schedule: [
          {
            cron: '0 8 * * *', // Every day at 8:00 AM
            queue: 'daily', // Queue to add the job to
          },
        ],

        inputSchema: [
          {
            name: 'date',
            type: 'date',
          },
        ],

        handler: async ({ req, input }) => {
          // Send daily digest emails
          const users = await req.payload.find({
            collection: 'users',
            where: { subscribed: { equals: true } },
          })

          for (const user of users.docs) {
            await req.payload.sendEmail({
              to: user.email,
              subject: 'Your Daily Digest',
              html: generateDigestHTML(user),
            })
          }

          return {
            output: {
              emailsSent: users.docs.length,
              date: input.date || new Date().toISOString(),
            },
          }
        },
      } as TaskConfig<'dailyDigest'>,
    ],

    // Important: You also need to configure a runner to execute scheduled jobs
    autoRun: [
      {
        cron: '* * * * *', // Check for jobs every minute
        queue: 'daily', // Process jobs from 'daily' queue
        limit: 10,
      },
    ],
  },
})
<Banner type="warning"> **Important:** The `schedule` property only **queues** jobs—it doesn't execute them. You must also configure a job runner (like `autoRun` above) to actually run the queued jobs. Both must use the same `queue` name. </Banner>

Key Points:

  • The schedule property automatically calls payload.jobs.queue() for you on the specified schedule
  • You can define multiple schedules per task by adding more objects to the schedule array
  • The cron field uses standard cron syntax (minute, hour, day, month, day-of-week)
  • Both schedule.queue and autoRun.queue must match for jobs to run
  • Scheduling is handled automatically by Payload's scheduler—no manual intervention needed

Common cron patterns:

ts
// Every hour at minute 0
schedule: [{ cron: '0 * * * *', queue: 'hourly' }]

// Every day at midnight
schedule: [{ cron: '0 0 * * *', queue: 'nightly' }]

// Every Monday at 9 AM
schedule: [{ cron: '0 9 * * 1', queue: 'weekly' }]

// Every 5 minutes
schedule: [{ cron: '*/5 * * * *', queue: 'frequent' }]

// Every 3 seconds (extended cron syntax with seconds field)
schedule: [{ cron: '*/3 * * * * *', queue: 'realtime' }]

See Job Schedules for comprehensive scheduling documentation, including hooks, concurrency controls, and troubleshooting.

Common Task Patterns

Database Operations

Creating or updating documents based on other document changes:

ts
{
  slug: 'updateRelatedPosts',
  retries: 2,
  inputSchema: [
    {
      name: 'categoryId',
      type: 'relationship',
      relationTo: 'categories',
      required: true,
    },
  ],
  handler: async ({ input, req }) => {
    const posts = await req.payload.find({
      collection: 'posts',
      where: {
        category: {
          equals: input.categoryId,
        },
      },
    })

    // Update all posts in this category
    for (const post of posts.docs) {
      await req.payload.update({
        collection: 'posts',
        id: post.id,
        data: {
          categoryUpdatedAt: new Date().toISOString(),
        },
      })
    }

    return {
      output: {
        postsUpdated: posts.docs.length,
      },
    }
  },
}

External API Calls

Calling third-party services without blocking your API:

ts
{
  slug: 'syncToThirdParty',
  retries: 3,
  inputSchema: [
    {
      name: 'documentId',
      type: 'text',
      required: true,
    },
  ],
  handler: async ({ input, req }) => {
    const doc = await req.payload.findByID({
      collection: 'documents',
      id: input.documentId,
    })

    // Call external API
    const response = await fetch('https://api.example.com/sync', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(doc),
    })

    if (!response.ok) {
      throw new Error(`API error: ${response.statusText}`)
    }

    return {
      output: {
        synced: true,
        apiResponse: await response.json(),
      },
    }
  },
}

Conditional Failure

Sometimes you want to fail a task based on business logic:

ts
{
  slug: 'processPayment',
  retries: 1,
  inputSchema: [
    {
      name: 'orderId',
      type: 'text',
      required: true,
    },
  ],
  handler: async ({ input, req }) => {
    const order = await req.payload.findByID({
      collection: 'orders',
      id: input.orderId,
    })

    // Intentionally fail if order is already processed
    if (order.status === 'paid') {
      throw new Error('Order already processed')
    }

    // Process payment...

    return {
      output: {
        paymentId: 'payment-123',
      },
    }
  },
}

Handling Task Failures

Tasks fail by throwing errors. When a task encounters any type of failure—whether it's an unexpected error, a validation issue, or a business logic violation—you should throw an error with a descriptive message.

ts
handler: async ({ input, req }) => {
  const order = await req.payload.findByID({
    collection: 'orders',
    id: input.orderId,
  })

  // Validation failure
  if (input.amount !== order.total) {
    throw new Error(
      `Amount mismatch: expected ${order.total}, received ${input.amount}`,
    )
  }

  // Business rule failure
  if (order.status === 'cancelled') {
    throw new Error('Cannot process payment for cancelled order')
  }

  // Conditional check
  if (order.status === 'paid') {
    throw new Error('Order already processed')
  }

  // Continue processing...
}

Preventing Job Retries

From within a task or workflow handler, you can prevent the entire job from being retried by throwing a JobCancelledError:

ts
throw new JobCancelledError('Job was cancelled')

Accessing Failure Information

After a task fails, you can inspect the job to understand what went wrong:

ts
const job = await payload.jobs.queue({
  task: 'processPayment',
  input: { orderId: '123', amount: 100 },
})

// Run the job
await payload.jobs.run()

// Check the job status
const completedJob = await payload.findByID({
  collection: 'payload-jobs',
  id: job.id,
})

// Check if job failed
if (completedJob.hasError) {
  // Access the latest error that caused the job to fail
  console.log(completedJob.error)
  // This will contain the error message from the thrown error

  // You can also check the job log to find specific tasks that errored
  // Note: If the job was retried multiple times, there will be multiple erroring tasks in the log
  const failedTasks = completedJob.log?.filter(
    (entry) => entry.state === 'failed',
  )
}
<Banner type="info"> Always throw errors with descriptive messages for better debugging and observability. The error message will be stored in the job's error field and visible in the admin UI. </Banner>

Understanding Task Execution

When a task runs

  1. The job is picked up from the queue by a worker
  2. The handler function executes with the provided input
  3. If successful, the output is stored and the job completes
  4. If it throws an error, the task will retry (up to retries count)
  5. After all retries are exhausted, the task and job fail
<Banner type="warning"> Important: Tasks should be idempotent when possible - meaning running them multiple times with the same input produces the same result. This is because retries might cause the task to run more than once. </Banner>

Advanced: Handler File Paths

In addition to defining handlers as functions directly provided to your Payload config, you can also pass an absolute path to where the handler is defined. If your task has large dependencies, and you are planning on executing your jobs in a separate process that has access to the filesystem, this could be a handy way to make sure that your Payload + Next.js app remains quick to compile and has minimal dependencies.

Keep in mind that this is an advanced feature that may require a sophisticated build pipeline, especially when using it in production or within Next.js, e.g. by calling opening the /api/payload-jobs/run endpoint. You will have to transpile the handler files separately and ensure they are available in the same location when the job is run. If you're using an endpoint to execute your jobs, it's recommended to define your handlers as functions directly in your Payload Config, or use import paths handlers outside of Next.js.

In general, this is an advanced use case. Here's how this would look:

payload.config.ts:

ts
import { fileURLToPath } from 'node:url'
import path from 'path'

const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)

export default buildConfig({
  jobs: {
    tasks: [
      {
        // ...
        // The #createPostHandler is a named export within the `createPost.ts` file
        handler:
          path.resolve(dirname, 'src/tasks/createPost.ts') +
          '#createPostHandler',
      },
    ],
  },
})

Then, the createPost file itself:

src/tasks/createPost.ts:

ts
import type { TaskHandler } from 'payload'

export const createPostHandler: TaskHandler<'createPost'> = async ({
  input,
  job,
  req,
}) => {
  const newPost = await req.payload.create({
    collection: 'post',
    req,
    data: {
      title: input.title,
    },
  })
  return {
    output: {
      postID: newPost.id,
    },
  }
}

Configuring task restoration

By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed.

You can configure this behavior through the retries.shouldRestore property. This property accepts a boolean or a function.

If shouldRestore is set to true, the task will only be re-run if it previously failed. This is the default behavior.

If shouldRestore is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.

If shouldRestore is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, e.g you may want to re-run a task up to X amount of times and then restore it for consecutive runs, or only re-run a task if the input has changed.

Example:

ts
export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        slug: 'myTask',
        retries: {
          shouldRestore: false,
        },
        // ...
      } as TaskConfig<'myTask'>,
    ],
  },
})

Example - determine whether a task should be restored based on the input data:

ts
export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        slug: 'myTask',
        inputSchema: [
          {
            name: 'someDate',
            type: 'date',
            required: true,
          },
        ],
        retries: {
          shouldRestore: ({ input }) => {
            if (new Date(input.someDate) > new Date()) {
              return false
            }
            return true
          },
        },
        // ...
      } as TaskConfig<'myTask'>,
    ],
  },
})

Nested tasks

You can run sub-tasks within an existing task, by using the tasks or inlineTask arguments passed to the task handler function:

ts
export default buildConfig({
  // ...
  jobs: {
    // It is recommended to set `addParentToTaskLog` to `true` when using nested tasks, so that the parent task is included in the task log
    // This allows for better observability and debugging of the task execution
    addParentToTaskLog: true,
    tasks: [
      {
        slug: 'parentTask',
        inputSchema: [
          {
            name: 'text',
            type: 'text',
          },
        ],
        handler: async ({ input, req, tasks, inlineTask }) => {
          await inlineTask('Sub Task 1', {
            task: () => {
              // Do something
              return {
                output: {},
              }
            },
          })

          await tasks.CreateSimple('Sub Task 2', {
            input: { message: 'hello' },
          })

          return {
            output: {},
          }
        },
      } as TaskConfig<'parentTask'>,
    ],
  },
})