Back to Conductor

Workflow Definition

docs/documentation/configuration/workflowdef/index.md

2019-04-12-130016.5 KB
Original Source

Workflow Definition

The Workflow Definition contains all the information necessary to define the behavior of a workflow. The most important part of this definition is the tasks property, which is an array of Task Configurations.

For the formal JSON Schema definitions of workflow and task structures, see the schemas/ directory in the repository.

Workflow Properties

FieldTypeDescriptionNotes
namestringName of the workflow
descriptionstringDescription of the workflowOptional
versionnumberNumeric field used to identify the version of the schema. Use incrementing numbers.When starting a workflow execution, if not specified, the definition with highest version is used
tasksarray of object(s)An array of task configurations. Details
inputParametersarray of string(s)List of input parameters. Used for documenting the required inputs to workflowOptional.
outputParametersobjectJSON template used to generate the output of the workflowIf not specified, the output is defined as the output of the last executed task
inputTemplateobjectDefault input values. See Using inputTemplateOptional.
failureWorkflowstringWorkflow to be run on current Workflow failure. Useful for cleanup or post actions on failure. ExplanationOptional.
schemaVersionnumberCurrent Conductor Schema version. schemaVersion 1 is discontinued.Must be 2
restartablebooleanFlag to allow Workflow restartsDefaults to true
workflowStatusListenerEnabledbooleanEnable status callback. ExplanationDefaults to false
ownerEmailstringEmail address of the team that owns the workflowRequired
timeoutSecondsnumberThe timeout in seconds after which the workflow will be marked as TIMED_OUT if it hasn't been moved to a terminal stateNo timeouts if set to 0
timeoutPolicystring (enum)Workflow's timeout policyDefaults to TIME_OUT_WF

Failure Workflow

The failure workflow gets the original failed workflow’s input along with 3 additional items,

  • workflowId - The id of the failed workflow which triggered the failure workflow.
  • reason - A string containing the reason for workflow failure.
  • failureStatus - A string status representation of the failed workflow.
  • failureTaskId - The id of the failed task of the workflow that triggered the failure workflow.

Timeout Policy

  • TIME_OUT_WF: Workflow is marked as TIMED_OUT and terminated
  • ALERT_ONLY: Registers a counter (workflow_failure with status tag set to TIMED_OUT)

Workflow Status Listener

Setting the workflowStatusListenerEnabled field in your Workflow Definition to true enables notifications.

To add a custom implementation of the Workflow Status Listener. Refer to the Workflow Status Listener extension guide.

The listener can be implemented in such a way as to either send a notification to an external system or to send an event on the conductor queue to complete/fail another task in another workflow as described in the event handlers guide.

Default Input with inputTemplate

  • inputTemplate allows you to define default input values, which can optionally be overridden at runtime (when the workflow is invoked).
  • Eg: In your Workflow Definition, you can define your inputTemplate as:
json
"inputTemplate": {
    "url": "https://some_url:7004"
}

And url would be https://some_url:7004 if no url was provided as input to your workflow.

Task Configurations

The tasks property in a Workflow Definition defines an array of Task Configurations. This is the blueprint for the workflow. Task Configurations can reference different types of Tasks.

  • Simple Tasks
  • System Tasks
  • Operators

Note: Task Configuration should not be confused with Task Definitions, which are used to register SIMPLE (worker based) tasks.

FieldTypeDescriptionNotes
namestringName of the task. MUST be registered as a Task Type with Conductor before starting workflow
taskReferenceNamestringAlias used to refer the task within the workflow. MUST be unique within workflow.
typestringType of task. SIMPLE for tasks executed by remote workers, or one of the system task types
descriptionstringDescription of the taskoptional
optionalbooleantrue or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as COMPLETED_WITH_ERRORSDefaults to false
inputParametersobjectJSON template that defines the input given to the task. Only one of inputParameters or inputExpression can be used in a task.See Using Expressions for details
inputExpressionobjectJSONPath expression that defines the input given to the task. Only one of inputParameters or inputExpression can be used in a task.See Using Expressions for details
asyncCompletebooleanfalse to mark status COMPLETED upon execution; true to keep the task IN_PROGRESS and wait for an external event to complete it.Defaults to false
startDelaynumberTime in seconds to wait before making the task available to be polled by a worker.Defaults to 0.

In addition to these parameters, System Tasks have their own parameters. Check out System Tasks for more information.

Using Expressions

Each executed task is given an input based on the inputParameters template or the inputExpression configured in the task configuration. Only one of inputParameters or inputExpression can be used in a task.

inputParameters

inputParameters can use JSONPath expressions to extract values out of the workflow input and other tasks in the workflow.

For example, workflows are supplied an input by the client/caller when a new execution is triggered. The workflow input is available via an expression of the form ${workflow.input...}. Likewise, the input and output data of a previously executed task can also be extracted using an expression for use in the inputParameters of a subsequent task.

Generally, inputParameters can use expressions of the following syntax:

${SOURCE.input/output.JSONPath}

FieldDescription
SOURCECan be either "workflow" or the reference name of any task
input/outputRefers to either the input or output of the source
JSONPathJSON path expression to extract JSON fragment from source's input/output

!!! note "JSON Path Support" Conductor supports JSONPath specification and uses the jayway/JsonPath Java implementation.

!!! note "Escaping expressions" To escape an expression, prefix it with an extra $ character (ex.: $${workflow.input...}).

inputExpression

inputExpression can be used to select an entire object from the workflow input, or the output of another task. The field supports all definite JSONPath expressions.

The syntax for mapping values in inputExpression follows the pattern,

SOURCE.input/output.JSONPath

NOTE: The inputExpression field does not require the expression to be wrapped in ${}.

See example below.

Examples

Example 1 - A Basic Workflow Definition

Assume your business logic is to simply to get some shipping information and then do the shipping. You start by logically partitioning them into two tasks:

  1. shipping_info - The first task takes the provided account number, and outputs an address.
  2. shipping_task - The 2nd task takes the address info and generates a shipping label.

We can configure these two tasks in the tasks array of our Workflow Definition. Let's assume that shipping info takes an account number, and returns a name and address.

json
{
  "name": "mail_a_box",
  "description": "shipping Workflow",
  "version": 1,
  "tasks": [
    {
      "name": "shipping_info",
      "taskReferenceName": "shipping_info_ref",
      "inputParameters": {
        "account": "${workflow.input.accountNumber}"
      },
      "type": "SIMPLE"
    },
    {
      "name": "shipping_task",
      "taskReferenceName": "shipping_task_ref",
      "inputParameters": {
        "name": "${shipping_info_ref.output.name}",
		"streetAddress": "${shipping_info_ref.output.streetAddress}",
		"city": "${shipping_info_ref.output.city}",
		"state": "${shipping_info_ref.output.state}",
		"zipcode": "${shipping_info_ref.output.zipcode}",
      },
      "type": "SIMPLE"
    }
  ],
  "outputParameters": {
    "trackingNumber": "${shipping_task_ref.output.trackingNumber}"
  },
  "failureWorkflow": "shipping_issues",
  "restartable": true,
  "workflowStatusListenerEnabled": true,
  "ownerEmail": "[email protected]",
  "timeoutPolicy": "ALERT_ONLY",
  "timeoutSeconds": 0,
  "variables": {},
  "inputTemplate": {}
}

Upon completion of the 2 tasks, the workflow outputs the tracking number generated in the 2nd task. If the workflow fails, a second workflow named shipping_issues is run.

Example 2 - Task Configuration

Consider a task http_task with input configured to use input/output parameters from workflow and a task named loc_task.

json
{
  "name": "encode_workflow",
  "description": "Encode movie.",
  "version": 1,
  "inputParameters": [
    "movieId", "fileLocation", "recipe"
  ],
  "tasks": [
    {
      "name": "loc_task",
      "taskReferenceName": "loc_task_ref",
      "taskType": "SIMPLE",
      ...      
    },    
    {
      "name": "http_task",
      "taskReferenceName": "http_task_ref",
      "taskType": "HTTP",
      "inputParameters": {
        "movieId": "${workflow.input.movieId}",
        "url": "${workflow.input.fileLocation}",
        "lang": "${loc_task.output.languages[0]}",
        "http_request": {
          "method": "POST",
          "url": "http://example.com/${loc_task.output.fileId}/encode",
          "body": {
            "recipe": "${workflow.input.recipe}",
            "params": {
              "width": 100,
              "height": 100
            }
          },
          "headers": {
            "Accept": "application/json",
            "Content-Type": "application/json"
          }
        }
      }
    }
  ],
  "ownerEmail": "[email protected]",
  "variables": {},
  "inputTemplate": {}
}

Consider the following as the workflow input

json
{
  "movieId": "movie_123",
  "fileLocation":"s3://moviebucket/file123",
  "recipe":"png"
}

And the output of the loc_task as the following;

json
{
  "fileId": "file_xxx_yyy_zzz",
  "languages": ["en","ja","es"]
}

When scheduling the task, Conductor will merge the values from workflow input and loc_task's output and create the input to the http_task as follows:

json
{
  "movieId": "movie_123",
  "url": "s3://moviebucket/file123",
  "lang": "en",
  "http_request": {
    "method": "POST",
    "url": "http://example.com/file_xxx_yyy_zzz/encode",
    "body": {
      "recipe": "png",
      "params": {
        "width": 100,
        "height": 100
      }
    },
    "headers": {
    	"Accept": "application/json",
    	"Content-Type": "application/json"
    }
  }
}

Example 3 - inputExpression

Given the following task configuration:

json
{
  "name": "loc_task",
  "taskReferenceName": "loc_task_ref",
  "taskType": "SIMPLE",
  "inputExpression": {
    "expression": "workflow.input",
    "type": "JSON_PATH"
  }  
}

When the workflow is invoked with the following workflow input

json
{
  "movieId": "movie_123",
  "fileLocation":"s3://moviebucket/file123",
  "recipe":"png"
}

When the task loc_task is scheduled, the entire workflow input object will be passed in as the task input:

json
{
  "movieId": "movie_123",
  "fileLocation":"s3://moviebucket/file123",
  "recipe":"png"
}