docs/devguide/how-tos/event-bus.md
Conductor integrates with external messaging systems to enable event-driven workflow orchestration. You can publish events from workflows and react to external events — starting workflows, completing tasks, or failing tasks based on incoming messages.
| System | Sink prefix | Module | Use case |
|---|---|---|---|
| Kafka | kafka | kafka | High-throughput, durable event streaming |
| NATS | nats | nats | Lightweight, low-latency messaging |
| NATS Streaming | nats-stream | nats-streaming | Durable NATS with replay (legacy) |
| NATS JetStream | nats | nats | Modern durable NATS streaming |
| AMQP (RabbitMQ) | amqp, amqp_queue, amqp_exchange | amqp | Traditional message queuing with routing |
| SQS | sqs | sqs | AWS-native message queuing |
| Conductor | conductor | built-in | Internal event routing between workflows |
Event bus orchestration has two sides:
┌──────────────┐ Event Task ┌──────────────┐ Event Handler ┌──────────────┐
│ Workflow A │ ──────────────────► │ Event Bus │ ──────────────────► │ Workflow B │
│ │ (publish) │ (Kafka/NATS/ │ (start_workflow) │ (triggered) │
│ │ │ AMQP/SQS) │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
The Event task publishes a message to any supported event bus. The sink parameter determines the target:
{
"name": "notify_downstream",
"taskReferenceName": "notify_ref",
"type": "EVENT",
"sink": "kafka:order-events",
"inputParameters": {
"orderId": "${workflow.input.orderId}",
"status": "PROCESSED"
}
}
For Kafka-specific features (custom headers, key, serializers), use the dedicated Kafka Publish task:
{
"name": "publish_to_kafka",
"taskReferenceName": "kafka_ref",
"type": "KAFKA_PUBLISH",
"inputParameters": {
"kafka_request": {
"topic": "order-events",
"value": "${workflow.input.orderData}",
"bootStrapServers": "kafka:9092",
"headers": {
"X-Correlation-Id": "${workflow.correlationId}"
}
}
}
}
The sink parameter follows the format prefix:queue_name:
| Example | System |
|---|---|
kafka:order-events | Kafka topic order-events |
nats:notifications | NATS subject notifications |
amqp:task-queue | AMQP queue task-queue |
amqp_exchange:events | AMQP exchange events |
sqs:my-queue | SQS queue my-queue |
conductor | Conductor internal queue |
conductor:workflow_name:queue_name | Conductor internal, specific queue |
Event handlers listen for messages on an event bus and execute actions when a matching event arrives. Register them via the /api/event API.
{
"name": "order_event_handler",
"event": "kafka:order-events",
"condition": "$.status == 'PROCESSED'",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "fulfillment_workflow",
"input": {
"orderId": "${orderId}"
}
}
}
]
}
| Action | Description |
|---|---|
start_workflow | Start a new workflow execution with the event payload as input. |
complete_task | Complete a waiting task (e.g., a WAIT or HUMAN task) in a running workflow. |
fail_task | Fail a task in a running workflow. |
The condition field supports JavaScript-like expressions evaluated against the event payload:
| Expression | Result |
|---|---|
$.version > 1 | true if version field > 1 |
$.metadata.codec == 'aac' | true if nested field matches |
$.status == 'COMPLETED' | true if status is COMPLETED |
Actions execute only when the condition evaluates to true. If no condition is specified, actions execute for every event.
Decouple workflows using events instead of sub-workflows:
{
"name": "order_pipeline",
"tasks": [
{
"name": "process_order",
"taskReferenceName": "process_ref",
"type": "SIMPLE"
},
{
"name": "notify_fulfillment",
"taskReferenceName": "notify_ref",
"type": "EVENT",
"sink": "kafka:fulfillment-requests",
"inputParameters": {
"orderId": "${workflow.input.orderId}",
"items": "${process_ref.output.items}"
}
}
]
}
A separate event handler starts the fulfillment workflow when the event arrives.
Combine a WAIT task with an event handler to pause a workflow until an external system signals completion:
{
"name": "wait_for_approval",
"taskReferenceName": "approval_ref",
"type": "WAIT"
}
Register an event handler that completes the task when an approval event arrives:
{
"name": "approval_handler",
"event": "kafka:approval-events",
"condition": "$.approved == true",
"actions": [
{
"action": "complete_task",
"complete_task": {
"workflowId": "${workflowId}",
"taskRefName": "approval_ref",
"output": {
"approvedBy": "${approvedBy}"
}
}
}
]
}
Each event bus module requires its own configuration. Enable the modules you need in your Conductor server configuration:
conductor.event-queues.kafka.enabled=true
conductor.event-queues.kafka.bootstrap-servers=kafka:9092
conductor.event-queues.nats.enabled=true
conductor.event-queues.nats.url=nats://localhost:4222
conductor.event-queues.amqp.enabled=true
conductor.event-queues.amqp.hosts=rabbitmq
conductor.event-queues.amqp.port=5672
conductor.event-queues.amqp.username=guest
conductor.event-queues.amqp.password=guest
Refer to the module source code for the full set of configuration properties.