docs/modules/module-queue.mdx
A module for asynchronous job processing. It supports two modes: topic-based queues (register a consumer per topic, emit events) and named queues (enqueue function calls via TriggerAction.Enqueue, no trigger registration).
modules::queue::QueueModule
Register a consumer for a topic and emit events to it.
registerTrigger({ type: 'queue', function_id: 'my::handler', config: { topic: 'order.created' } }). This subscribes the handler to that topic.trigger({ function_id: 'enqueue', payload: { topic: 'order.created', data: payload } }) or trigger({ function_id: 'enqueue', payload: { topic, data }, action: TriggerAction.Void() }) for fire-and-forget. The enqueue function routes the payload to all subscribers of that topic.data as its input. Optional queue_config on the trigger controls per-subscriber retries and concurrency.The producer knows the topic name; consumers register to receive it. Queue settings can live at the trigger registration site.
Define queues in iii-config.yaml, then enqueue function calls directly. No trigger registration needed.
queue_configs (see Configuration).trigger({ function_id: 'orders::process', payload, action: TriggerAction.Enqueue({ queue: 'payment' }) }). The engine routes the job to the named queue and invokes the function when a worker consumes it.payload as its input. Retries, concurrency, and FIFO are configured centrally in iii-config.yaml.The producer targets the function and queue explicitly. Queue configuration is centralized. For a hands-on walkthrough, see Use Queues.
| Topic-based | Named queues | |
|---|---|---|
| Producer | Calls trigger({ function_id: 'enqueue', payload: { topic, data } }) | Calls trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) }) |
| Consumer | Registers registerTrigger({ type: 'queue', config: { topic } }) | No registration — function is the target |
| Config | Optional queue_config on trigger | queue_configs in iii-config.yaml |
| Use case | Pub/sub, multiple subscribers per topic | Direct function invocation with retries, FIFO, DLQ |
Both modes are valid. Named queues offer config-driven retries, concurrency, and FIFO ordering.
<Info title="Trigger actions"> Named queues use the `Enqueue` trigger action. For a full comparison of synchronous, Void, and Enqueue invocation modes, see [Trigger Actions](../how-to/trigger-actions). </Info>- class: modules::queue::QueueModule
config:
queue_configs:
default:
max_retries: 5
concurrency: 5
type: standard
payment:
max_retries: 10
concurrency: 2
type: fifo
message_group_field: transaction_id
adapter:
class: modules::queue::BuiltinQueueAdapter
config:
store_method: file_based
file_path: ./data/queue_store
Each entry in queue_configs defines an independent named queue with its own retry, concurrency, and ordering settings.
Built-in in-process queue. No external dependencies. Suitable for single-instance deployments — messages are not shared across engine instances.
class: modules::queue::BuiltinQueueAdapter
config:
store_method: file_based # in_memory | file_based
file_path: ./data/queue_store # required when store_method is file_based
Uses Redis as the queue backend for topic-based pub/sub. Enables message distribution across multiple engine instances.
<Warning title="Limited named queue support"> The Redis adapter supports publishing to named queues but does not implement named queue consumption, retries, or dead-letter queues. It is suitable for topic-based pub/sub only. For full named queue support in multi-instance deployments, use the [RabbitMQ adapter](#modulesqueuerabbitmqadapter). </Warning>class: modules::queue::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}
Uses RabbitMQ as the queue backend. Supports durable delivery, retries, and dead-letter queues across multiple engine instances.
The engine owns consumer loops, retry acknowledgement, and backoff logic — RabbitMQ is used as a transport only. Retry uses explicit ack + republish to a retry exchange with an x-attempt header, keeping compatibility with both classic and quorum queues.
class: modules::queue::RabbitMQAdapter
config:
amqp_url: ${RABBITMQ_URL:amqp://localhost:5672}
For each named queue defined in queue_configs, iii creates the following RabbitMQ resources:
| Resource | Format | Example (payment) |
|---|---|---|
| Main exchange | iii.__fn_queue::<name> | iii.__fn_queue::payment |
| Main queue | iii.__fn_queue::<name>.queue | iii.__fn_queue::payment.queue |
| Retry exchange | iii.__fn_queue::<name>::retry | iii.__fn_queue::payment::retry |
| Retry queue | iii.__fn_queue::<name>::retry.queue | iii.__fn_queue::payment::retry.queue |
| DLQ exchange | iii.__fn_queue::<name>::dlq | iii.__fn_queue::payment::dlq |
| DLQ queue | iii.__fn_queue::<name>::dlq.queue | iii.__fn_queue::payment::dlq.queue |
| BuiltinQueueAdapter | RabbitMQAdapter | RedisAdapter | |
|---|---|---|---|
| Retries | Yes | Yes | No |
| Dead-letter queue | Yes | Yes | No |
| FIFO ordering | Yes | Yes | No |
| Named queue consumption | Yes | Yes | No (publish only) |
| Topic-based pub/sub | Yes | Yes | Yes |
| Multi-instance | No | Yes | Yes |
| External dependency | None | RabbitMQ | Redis |
The queue module registers the following functions automatically when it initializes. These are callable via trigger() from any SDK or via the iii trigger CLI command.
Publishes a message to a topic-based queue.
| Field | Type | Description |
|---|---|---|
topic | string | The topic to publish to (required, non-empty) |
data | any | The payload to deliver to subscribers |
Returns null on success.
Moves all messages from a named queue's dead-letter queue back to the main queue. Each message gets its attempt counter reset to zero.
Input:
| Field | Type | Description |
|---|---|---|
queue | string | The named queue whose DLQ should be redriven (required, non-empty) |
Output:
| Field | Type | Description |
|---|---|---|
queue | string | The queue name that was redriven |
redriven | number | The number of messages moved back to the main queue |
CLI example:
iii trigger \
--function-id='iii::queue::redrive' \
--payload='{"queue": "payment"}'
sequenceDiagram
participant C as Caller
participant E as Engine
participant Q as Queue Adapter
participant F as Target Function
C->>E: trigger({ function_id, payload, action: Enqueue({ queue }) })
E->>E: Validate queue config & FIFO field
E->>Q: Enqueue job
Q-->>E: Job accepted
E-->>C: { messageReceiptId }
loop Consumer
Q->>E: Job available
E->>F: Invoke with payload
alt Success
F-->>E: Return value
E->>Q: Ack
else Failure — attempt < max_retries
F-->>E: Error
E->>Q: Ack + republish with backoff delay
else Failure — attempts exhausted
F-->>E: Error
E->>Q: Route to DLQ
end
end