Back to Medusa

{metadata.title}

www/apps/resources/app/infrastructure-modules/workflow-engine/how-to-use/page.mdx

2.14.213.8 KB
Original Source

import { TypeList } from "docs-ui"

export const metadata = { title: How to Use the Workflow Engine Module, }

{metadata.title}

In this document, you’ll learn about the different methods in the Workflow Engine Module's service and how to use them.


Resolve Workflow Engine Module's Service

In your workflow's step, you can resolve the Workflow Engine Module's service from the Medusa container:

ts
import { Modules } from "@medusajs/framework/utils"
import { createStep } from "@medusajs/framework/workflows-sdk"

const step1 = createStep(
  "step-1",
  async ({}, { container }) => {
    const workflowEngineModuleService = container.resolve(
      Modules.WORKFLOW_ENGINE
    )
    
    // TODO use workflowEngineModuleService
  } 
)

This will resolve the service of the configured Workflow Engine Module, which is the In-Memory Workflow Engine Module by default.

You can then use the Workflow Engine Module's service's methods in the step. The rest of this guide details these methods.


retryStep

This method retries a step that has temporary failed, such as a step that has autoRetry set to false or when the machine running the Medusa application shuts down. Learn more about it in the Retry Failed Steps guide.

<Note>

This method is available since Medusa v2.10.2.

</Note>

Example

ts
// other imports...
import {
  TransactionHandlerType,
} from "@medusajs/framework/utils"

workflowEngine.retryStep({
  idempotencyKey: {
    action: TransactionHandlerType.INVOKE,
    transactionId,
    stepId: "step-1",
    workflowId: "hello-world",
  },
})

Parameters

<TypeList types={[ { "name": "idempotencyKey", "type": "object", "description": "The details of the step to retry.", "optional": false, "defaultValue": "", "expandable": false, "children": [ { "name": "action", "type": "invoke | compensate", "description": "If the step's compensation function is running, use compensate. Otherwise, use invoke.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "transactionId", "type": "string", "description": "The ID of the workflow execution's transaction.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "stepId", "type": "string", "description": "The ID of the step to change its status. This is the first parameter passed to createStep when creating the step.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "workflowId", "type": "string", "description": "The ID of the workflow. This is the first parameter passed to createWorkflow when creating the workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] }, { "name": "options", "type": "object", "description": "Options to pass to the step.", "optional": true, "defaultValue": "", "expandable": false, "children": [ { "name": "container", "type": "Container", "description": "An instance of the Medusa container.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] } ]} expandUrl="https://docs.medusajs.com/learn/fundamentals/data-models/manage-relationships#retrieve-records-of-relation" sectionTitle="retryStep"/>


setStepSuccess

This method sets an async step in a currently-executing long-running workflow as successful. The workflow will then continue to the next step.

Example

ts
// other imports...
import {
  TransactionHandlerType,
} from "@medusajs/framework/utils"

await workflowEngineModuleService.setStepSuccess({
  idempotencyKey: {
    action: TransactionHandlerType.INVOKE,
    transactionId,
    stepId: "step-2",
    workflowId: "hello-world",
  },
  stepResponse: new StepResponse("Done!"),
})

Parameters

<TypeList types={[ { "name": "idempotencyKey", "type": "object", "description": "The details of the step to set as successful.", "optional": false, "defaultValue": "", "expandable": false, "children": [ { "name": "action", "type": "invoke | compensate", "description": "If the step's compensation function is running, use compensate. Otherwise, use invoke.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "transactionId", "type": "string", "description": "The ID of the workflow execution's transaction.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "stepId", "type": "string", "description": "The ID of the step to change its status. This is the first parameter passed to createStep when creating the step.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "workflowId", "type": "string", "description": "The ID of the workflow. This is the first parameter passed to createWorkflow when creating the workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] }, { "name": "stepResponse", "type": "StepResponse", "description": "Set the response of the step. This is similar to the response you return in a step's definition, but since the async step doesn't have a response, you set its response when changing its status.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "options", "type": "object", "description": "Options to pass to the step.", "optional": true, "defaultValue": "", "expandable": false, "children": [ { "name": "container", "type": "Container", "description": "An instance of the Medusa container.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] } ]} expandUrl="https://docs.medusajs.com/learn/fundamentals/data-models/manage-relationships#retrieve-records-of-relation" sectionTitle="setStepSuccess"/>


setStepFailure

This method sets an async step in a currently-executing long-running workflow as failed. The workflow will then stop executing and the compensation functions of the workflow's steps will be executed.

Example

ts
// other imports...
import {
  TransactionHandlerType,
} from "@medusajs/framework/utils"

await workflowEngineModuleService.setStepFailure({
  idempotencyKey: {
    action: TransactionHandlerType.INVOKE,
    transactionId,
    stepId: "step-2",
    workflowId: "hello-world",
  },
  stepResponse: new StepResponse("Failed!"),
})

Parameters

<TypeList types={[ { "name": "idempotencyKey", "type": "object", "description": "The details of the step to set as failed.", "optional": false, "defaultValue": "", "expandable": false, "children": [ { "name": "action", "type": "invoke | compensate", "description": "If the step's compensation function is running, use compensate. Otherwise, use invoke.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "transactionId", "type": "string", "description": "The ID of the workflow execution's transaction.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "stepId", "type": "string", "description": "The ID of the step to change its status. This is the first parameter passed to createStep when creating the step.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "workflowId", "type": "string", "description": "The ID of the workflow. This is the first parameter passed to createWorkflow when creating the workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] }, { "name": "stepResponse", "type": "StepResponse", "description": "Set the response of the step. This is similar to the response you return in a step's definition, but since the async step doesn't have a response, you set its response when changing its status.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "options", "type": "object", "description": "Options to pass to the step.", "optional": true, "defaultValue": "", "expandable": false, "children": [ { "name": "container", "type": "Container", "description": "An instance of the Medusa container.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] } ]} expandUrl="https://docs.medusajs.com/learn/fundamentals/data-models/manage-relationships#retrieve-records-of-relation" sectionTitle="setStepFailure"/>


subscribe

This method subscribes to a workflow's events. You can use this method to listen to a long-running workflow's events and retrieve its result once it's done executing.

Refer to the Long-Running Workflows documentation to learn more.

Example

ts
const { transaction } = await helloWorldWorkflow(container).run()

const subscriptionOptions = {
  workflowId: "hello-world",
  transactionId: transaction.transactionId,
  subscriberId: "hello-world-subscriber",
}

await workflowEngineModuleService.subscribe({
  ...subscriptionOptions,
  subscriber: async (data) => {
    if (data.eventType === "onFinish") {
      console.log("Finished execution", data.result)
      // unsubscribe
      await workflowEngineModuleService.unsubscribe({
        ...subscriptionOptions,
        subscriberOrId: subscriptionOptions.subscriberId,
      })
    } else if (data.eventType === "onStepFailure") {
      console.log("Workflow failed", data.step)
    }
  },
})

Parameters

<TypeList types={[ { "name": "subscriptionOptions", "type": "object", "description": "The options for the subscription.", "optional": false, "defaultValue": "", "expandable": false, "children": [ { "name": "workflowId", "type": "string", "description": "The ID of the workflow to subscribe to. This is the first parameter passed to createWorkflow when creating the workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "transactionId", "type": "string", "description": "The ID of the workflow execution's transaction. This is returned when you execute a workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "subscriberId", "type": "string", "description": "A unique ID for the subscriber. It's used to unsubscribe from the workflow's events.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "subscriber", "type": "(data: WorkflowEvent) => void", "description": "The subscriber function that will be called when the workflow emits an event.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ] } ]} expandUrl="https://docs.medusajs.com/learn/fundamentals/data-models/manage-relationships#retrieve-records-of-relation" sectionTitle="subscribe"/>


unsubscribe

This method unsubscribes from a workflow's events. You can use this method to stop listening to a long-running workflow's events after you've received the result.

Example

ts
await workflowEngineModuleService.unsubscribe({
  workflowId: "hello-world",
  transactionId: "transaction-id",
  subscriberOrId: "hello-world-subscriber",
})

Parameters

<TypeList types={[ { "name": "workflowId", "type": "string", "description": "The ID of the workflow to unsubscribe from. This is the first parameter passed to createWorkflow when creating the workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "transactionId", "type": "string", "description": "The ID of the workflow execution's transaction. This is returned when you execute a workflow.", "optional": false, "defaultValue": "", "expandable": false, "children": [] }, { "name": "subscriberOrId", "type": "string", "description": "The subscriber ID or the subscriber function to unsubscribe from the workflow's events.", "optional": false, "defaultValue": "", "expandable": false, "children": [] } ]} expandUrl="https://docs.medusajs.com/learn/fundamentals/data-models/manage-relationships#retrieve-records-of-relation" sectionTitle="unsubscribe"/>