Back to Payload

Workflows

docs/jobs-queue/workflows.mdx

3.84.121.5 KB
Original Source
<Banner type="default"> A **"Workflow"** is an optional way to *combine multiple tasks together* in a way that can be gracefully retried from the point of failure. </Banner>

They're most helpful when you have multiple tasks in a row, and you want to configure each task to be able to be retried if they fail.

If a task within a workflow fails, the Workflow will automatically "pick back up" on the task where it failed and not re-execute any prior tasks that have already been executed.

Why use Workflows?

Single Task vs Workflow

If you only need to run one operation, use a single Task. But if you need multiple steps that depend on each other, use a Workflow.

Example scenario: When a user signs up, you need to:

  1. Create their user profile
  2. Send a welcome email
  3. Add them to your email marketing list

Without a workflow, if step 2 fails, you'd have to:

  • Re-run all three steps (wasting resources)
  • Manually track which steps succeeded
  • Risk creating duplicate profiles or sending duplicate emails

With a workflow:

  • If step 2 fails, only step 2 retries (steps 1 and 3 don't re-run)
  • All task outputs are automatically tracked
  • The workflow "resumes" from the failure point

Defining a workflow

The most important aspect of a Workflow is the handler, where you can declare when and how the tasks should run by simply calling the runTask function. If any task within the workflow, fails, the entire handler function will re-run.

However, importantly, tasks that have successfully been completed will simply re-return the cached and saved output without running again. The Workflow will pick back up where it failed and only task from the failure point onward will be re-executed.

To define a JS-based workflow, simply add a workflow to the jobs.workflows array in your Payload config. A workflow consists of the following fields:

OptionDescription
slugDefine a slug-based name for this workflow. This slug needs to be unique among both tasks and workflows.
handlerThe function that should be responsible for running the workflow. You can either pass a string-based path to the workflow function file, or workflow job function itself. If you are using large dependencies within your workflow, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. Passing a string path is an advanced feature that may require a sophisticated build pipeline in order to work.
inputSchemaDefine the input field schema - Payload will generate a type for this schema.
interfaceNameYou can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug.
labelDefine a human-friendly label for this workflow.
queueOptionally, define the queue name that this workflow should be tied to. Defaults to "default".
retriesYou can define retries on the workflow level, which will enforce that the workflow can only fail up to that number of retries. If a task does not have retries specified, it will inherit the retry count as specified on the workflow. You can specify 0 as workflow retries, which will disregard all task retry specifications and fail the entire workflow on any task failure. You can leave workflow retries as undefined, in which case, the workflow will respect what each task dictates as their own retry count. By default this is undefined, meaning workflows retries are defined by their tasks
concurrencyControl how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires jobs.enableConcurrencyControl: true to be set. See Concurrency Controls below for details.

Example:

ts
export default buildConfig({
  // ...
  jobs: {
    tasks: [
      // ...
    ]
    workflows: [
      {
        slug: 'createPostAndUpdate',

        // The arguments that the workflow will accept
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],

        // The handler that defines the "control flow" of the workflow
        // Notice how it uses the `tasks` argument to execute your predefined tasks.
        // These are strongly typed!
        handler: async ({ job, tasks }) => {

          // This workflow first runs a task called `createPost`.

          // You need to define a unique ID for this task invocation
          // that will always be the same if this workflow fails
          // and is re-executed in the future. Here, we hard-code it to '1'
          const output = await tasks.createPost('1', {
            input: {
              title: job.input.title,
            },
          })

          // Once the prior task completes, it will run a task
          // called `updatePost`
          await tasks.updatePost('2', {
            input: {
              post: job.taskStatus.createPost['1'].output.postID, // or output.postID
              title: job.input.title + '2',
            },
          })
        },
      } as WorkflowConfig<'updatePost'>
    ]
  }
})

Running tasks inline

In the above example, our workflow was executing tasks that we already had defined in our Payload config. But, you can also run tasks without predefining them.

To do this, you can use the inlineTask function.

The drawbacks of this approach are that tasks cannot be re-used across workflows as easily, and the task data stored in the job will not be typed. In the following example, the inline task data will be stored on the job under job.taskStatus.inline['2'] but completely untyped, as types for dynamic tasks like these cannot be generated beforehand.

Example:

ts
export default buildConfig({
  // ...
  jobs: {
    tasks: [
      // ...
    ]
    workflows: [
      {
        slug: 'createPostAndUpdate',
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],
        handler: async ({ job, tasks, inlineTask }) => {
          // Here, we run a predefined task.
          // The `createPost` handler arguments and return type
          // are both strongly typed
          const output = await tasks.createPost('1', {
            input: {
              title: job.input.title,
            },
          })

          // Here, this task is not defined in the Payload config
          // and is "inline". Its output will be stored on the Job in the database
          // however its arguments will be untyped.
          const { newPost } = await inlineTask('2', {
            task: async ({ req }) => {
              const newPost = await req.payload.update({
                collection: 'post',
                id: '2',
                req,
                retries: 3,
                data: {
                  title: 'updated!',
                },
              })
              return {
                output: {
                  newPost
                },
              }
            },
          })
        },
      } as WorkflowConfig<'updatePost'>
    ]
  }
})

Understanding Workflow failure & recovery

One of the most powerful features of workflows is how they handle failures. Let's walk through what actually happens:

Example workflow:

ts
handler: async ({ job, tasks }) => {
  await tasks.createProfile('step1', { input: { userId: '123' } })
  await tasks.sendEmail('step2', { input: { userId: '123' } })
  await tasks.addToList('step3', { input: { userId: '123' } })
}

Scenario: Email service is down

First execution attempt:

  • Step 1 (createProfile) succeeds → Profile created in database
  • Step 2 (sendEmail) fails → Email service timeout
  • Step 3 (addToList) never runs → Workflow pauses

The job is marked for retry. Task 2 has retries: 3, so it will be attempted again.

Second execution attempt (automatic retry):

  • Step 1 skipped → Returns cached output from first run (no duplicate profile)
  • Step 2 retries → Email service is back up, succeeds!
  • Step 3 runs → User added to mailing list
<Banner type="default"> The entire handler function re-runs, but completed tasks return their cached results immediately without re-executing their logic. </Banner>

Accessing Task outputs

Tasks can pass data to subsequent tasks through their outputs:

ts
handler: async ({ job, tasks }) => {
  // Task 1: Create a document and return its ID
  await tasks.createDocument('create-doc', {
    input: { title: 'My Document' },
  })

  // Access the output from task 1 in two ways:

  // Method 1: Through job.taskStatus
  const docId = job.taskStatus.createDocument['create-doc'].output.documentId

  // Method 2: Capture the return value directly
  const result = await tasks.createDocument('create-doc', {
    input: { title: 'My Document' },
  })
  const docId2 = result.output.documentId

  // Use the output in task 2
  await tasks.updateDocument('update-doc', {
    input: {
      documentId: docId,
      status: 'published',
    },
  })
}

Task status structure:

ts
job.taskStatus = {
  [taskSlug]: {
    [taskId]: {
      input: {
        /* the input you passed */
      },
      output: {
        /* the output returned by the task */
      },
      complete: true,
      totalTried: 1,
    },
  },
}

Workflow best practices

Use descriptive task IDs

ts
// Hard to debug
await tasks.sendEmail('1', { input })
await tasks.updateUser('2', { input })

// Clear and maintainable
await tasks.sendEmail('send-welcome-email', { input })
await tasks.updateUser('mark-onboarding-complete', { input })

Keep tasks small and focused

ts
// Task does too much
{
  slug: 'onboardUser',
  handler: async ({ input }) => {
    await createProfile(input)
    await sendEmail(input)
    await addToMailingList(input)
    // All-or-nothing - if email fails, everything fails
  }
}

// Separate tasks with individual retry logic
await tasks.createProfile('create-profile', { input })
await tasks.sendEmail('send-email', { input }) // Can retry independently
await tasks.addToMailingList('add-to-list', { input })

Pass IDs, not entire objects

ts
// Passing large objects
await tasks.processUser('process', {
  input: {
    user: {
      /* entire user object with all fields */
    },
  },
})

// Pass just the ID
await tasks.processUser('process', {
  input: {
    userId: '123',
  },
})
// Task fetches what it needs: await req.payload.findByID(...)

Set appropriate retry counts

  • External APIs (email, payment processors): Higher retries (3-5) - services can be temporarily unavailable
  • Database operations: Lower retries (1-2) - usually succeed or fail permanently
  • Idempotent operations (safe to run multiple times): Higher retries are safe
  • Non-idempotent operations (creates, charges, sends): Lower retries to avoid duplicates

Handle errors with context

ts
handler: async ({ input, req }) => {
  try {
    const result = await fetch('https://api.example.com/data')
    if (!result.ok) {
      throw new Error(`API returned ${result.status}: ${result.statusText}`)
    }
    return { output: { success: true } }
  } catch (error) {
    // Provide context about what failed and why
    throw new Error(
      `Failed to sync data for user ${input.userId}: ${error.message}`,
    )
  }
}

Concurrency Controls

When multiple jobs operate on the same resource, race conditions can occur. For example, if a user creates a document and then quickly updates it, two jobs might be queued that both try to process the same document simultaneously, leading to unexpected results.

The concurrency option allows you to prevent this by ensuring that jobs with the same "key" run exclusively (one at a time).

<Banner type="warning"> **Important:** To use concurrency controls, you must first enable them in your Payload config by setting `jobs.enableConcurrencyControl: true`. This adds an indexed `concurrencyKey` field to your jobs collection schema and may require a database migration depending on your database adapter. </Banner>

Enabling Concurrency Controls

First, enable the feature in your Payload config:

ts
export default buildConfig({
  jobs: {
    enableConcurrencyControl: true,
    // ... your tasks and workflows
  },
})

Then add the concurrency option to your workflow configuration:

ts
export default buildConfig({
  jobs: {
    workflows: [
      {
        slug: 'syncDocument',
        inputSchema: [{ name: 'documentId', type: 'text', required: true }],
        // Jobs with the same concurrency key run one at a time
        concurrency: ({ input }) => `sync:${input.documentId}`,
        handler: async ({ job, inlineTask }) => {
          await inlineTask('fetch-and-update', {
            task: async ({ req }) => {
              // This runs exclusively - no other job for the same
              // documentId can run at the same time
              const doc = await req.payload.findByID({
                collection: 'posts',
                id: job.input.documentId,
              })

              await req.payload.update({
                collection: 'posts',
                id: job.input.documentId,
                data: { syncedAt: new Date().toISOString() },
              })

              return { output: { synced: true } }
            },
          })
        },
      },
    ],
  },
})

How It Works

When you define a concurrency key:

  1. When queuing: The concurrency key is computed from the job's input and stored on the job document.

  2. When running: The job runner enforces exclusive execution through two mechanisms:

    • It first checks which concurrency keys are currently being processed and excludes pending jobs with those keys from the query
    • If multiple pending jobs with the same key are picked up in the same batch, only the first one (by creation order) runs - the others are released back to processing: false and will be picked up on subsequent runs
  3. Result: Jobs with the same concurrency key are guaranteed to run sequentially, never in parallel. All jobs are preserved and will eventually complete - they just wait their turn.

Concurrency Configuration Options

The concurrency option accepts either a function (shorthand) or an object with more options:

Shorthand (function only):

ts
// Exclusive defaults to true, supersedes defaults to false
concurrency: ({ input }) => `my-key:${input.resourceId}`

Full configuration:

ts
concurrency: {
  // Function that returns a key to group related jobs
  // The queue name is provided to allow for queue-specific keys if needed
  key: ({ input, queue }) => `my-key:${input.resourceId}`,

  // Only one job with this key can run at a time
  // @default true
  exclusive: true,

  // Delete older pending jobs when a new job is queued
  // @default false
  supersedes: false,
}

Common Patterns

1. Exclusive only (preserve all jobs):

ts
concurrency: {
  key: ({ input }) => `process:${input.documentId}`,
  exclusive: true,
  supersedes: false, // All jobs run, just not in parallel
}

Use when every job represents unique work that must complete (e.g., processing distinct versions of a document).

2. Exclusive + Supersedes (last queued wins):

ts
concurrency: {
  key: ({ input }) => `generate:${input.documentId}`,
  exclusive: true,
  supersedes: true, // Only latest job runs
}

Use when only the latest state matters (e.g., regenerating embeddings after rapid edits - intermediate states can be skipped).

3. Queue-specific concurrency:

ts
concurrency: {
  key: ({ input, queue }) => `${queue}:sync:${input.resourceId}`,
}

Include the queue name to allow the same resource to be processed concurrently in different queues.

Supersedes Behavior

When supersedes: true is set, newly queued jobs will automatically delete older pending (not yet running) jobs with the same concurrency key:

Example scenario:

1. Job A queued → pending
2. Job B queued → A deleted, B pending
3. Job C queued → B deleted, C pending
4. Only Job C runs

Configuration:

ts
concurrency: {
  key: ({ input }) => `generate:${input.documentId}`,
  exclusive: true,  // Still enforced
  supersedes: true, // Delete older pending jobs
}

When to use:

  • Data regeneration (embeddings, thumbnails) after rapid edits
  • Report generation where only the latest parameters matter
  • Any scenario where intermediate pending jobs are made obsolete by newer ones

Important notes:

  • Only pending jobs (not yet running) are deleted
  • If a job is already running, it completes normally and the new job waits
  • Without exclusive: true, supersedes still deletes pending jobs but won't prevent parallel execution

Important Considerations

  • Key uniqueness: The concurrency key should uniquely identify the resource being operated on. Include all relevant identifiers (collection slug, document ID, locale, etc.).

  • Global by default: By default, concurrency is global across all queues. A job with key sync:doc1 in the default queue will block a job with the same key in the emails queue. Include the queue name in your key if you want queue-specific concurrency.

  • No concurrency key = no restrictions: Jobs without a concurrency configuration run in parallel as before.

  • Pending jobs wait: Jobs that can't run due to concurrency constraints remain in the queue with processing: false and will be picked up on subsequent runs.