Back to Iii

Queue

docs/modules/module-queue.mdx

0.13.011.2 KB
Original Source

A module for asynchronous job processing. It supports two modes: topic-based queues (register a consumer per topic, emit events) and named queues (enqueue function calls via TriggerAction.Enqueue, no trigger registration).

modules::queue::QueueModule
<Info title="How-to guidance"> For step-by-step instructions on configuring and using queues, see [Use Queues](../how-to/use-queues). For DLQ management, see [Manage Failed Triggers](../how-to/dead-letter-queues). </Info>

Queue Modes

Topic-based queues

Register a consumer for a topic and emit events to it.

  1. Register a consumer with registerTrigger({ type: 'queue', function_id: 'my::handler', config: { topic: 'order.created' } }). This subscribes the handler to that topic.
  2. Emit events by calling trigger({ function_id: 'enqueue', payload: { topic: 'order.created', data: payload } }) or trigger({ function_id: 'enqueue', payload: { topic, data }, action: TriggerAction.Void() }) for fire-and-forget. The enqueue function routes the payload to all subscribers of that topic.
  3. Action on the trigger: the handler receives the data as its input. Optional queue_config on the trigger controls per-subscriber retries and concurrency.

The producer knows the topic name; consumers register to receive it. Queue settings can live at the trigger registration site.

Named queues

Define queues in iii-config.yaml, then enqueue function calls directly. No trigger registration needed.

  1. Define queues in queue_configs (see Configuration).
  2. Enqueue a function call with trigger({ function_id: 'orders::process', payload, action: TriggerAction.Enqueue({ queue: 'payment' }) }). The engine routes the job to the named queue and invokes the function when a worker consumes it.
  3. Action on the trigger: the target function receives payload as its input. Retries, concurrency, and FIFO are configured centrally in iii-config.yaml.

The producer targets the function and queue explicitly. Queue configuration is centralized. For a hands-on walkthrough, see Use Queues.

When to use which

Topic-basedNamed queues
ProducerCalls trigger({ function_id: 'enqueue', payload: { topic, data } })Calls trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })
ConsumerRegisters registerTrigger({ type: 'queue', config: { topic } })No registration — function is the target
ConfigOptional queue_config on triggerqueue_configs in iii-config.yaml
Use casePub/sub, multiple subscribers per topicDirect function invocation with retries, FIFO, DLQ

Both modes are valid. Named queues offer config-driven retries, concurrency, and FIFO ordering.

<Info title="Trigger actions"> Named queues use the `Enqueue` trigger action. For a full comparison of synchronous, Void, and Enqueue invocation modes, see [Trigger Actions](../how-to/trigger-actions). </Info>

Sample Configuration

yaml
- class: modules::queue::QueueModule
  config:
    queue_configs:
      default:
        max_retries: 5
        concurrency: 5
        type: standard
      payment:
        max_retries: 10
        concurrency: 2
        type: fifo
        message_group_field: transaction_id
    adapter:
      class: modules::queue::BuiltinQueueAdapter
      config:
        store_method: file_based
        file_path: ./data/queue_store

Configuration

<ResponseField name="queue_configs" type="map[string, FunctionQueueConfig]" required> A map of named queue configurations. Each key is the queue name referenced in `TriggerAction.Enqueue({ queue: 'name' })`. Define a queue named `default` in config for the common case; reference it as `TriggerAction.Enqueue({ queue: 'default' })`. </ResponseField> <ResponseField name="adapter" type="Adapter"> The transport adapter for queue persistence and distribution. Defaults to `modules::queue::BuiltinQueueAdapter` when not specified. </ResponseField>

Queue Configuration

Each entry in queue_configs defines an independent named queue with its own retry, concurrency, and ordering settings.

<ResponseField name="max_retries" type="u32"> Maximum delivery attempts before routing the job to the dead-letter queue. Defaults to `3`. </ResponseField> <ResponseField name="concurrency" type="u32"> Maximum number of jobs processed simultaneously from this queue. Defaults to `10`. For FIFO queues, the engine overrides this to `prefetch=1` to guarantee ordering — see the note below. </ResponseField> <ResponseField name="type" type="string"> Delivery mode: `standard` (concurrent, default) or `fifo` (ordered within a message group). </ResponseField> <ResponseField name="message_group_field" type="string"> Required when `type` is `fifo`. The JSON field in the job payload whose value determines the ordering group. Jobs with the same group value are processed strictly in order. The field must be present and non-null in every enqueued payload. </ResponseField> <ResponseField name="backoff_ms" type="u64"> Base retry backoff in milliseconds. Applied with exponential scaling: `backoff_ms × 2^(attempt − 1)`. Defaults to `1000`. </ResponseField> <ResponseField name="poll_interval_ms" type="u64"> Worker poll interval in milliseconds. Defaults to `100`. </ResponseField> <Warning title="FIFO and concurrency"> When `type` is `fifo`, the engine sets `prefetch=1` regardless of the `concurrency` value. This ensures only one job is in-flight at a time, which is required for strict ordering. FIFO queues also retry failed jobs inline (blocking the queue) rather than in parallel. </Warning>

Adapters

modules::queue::BuiltinQueueAdapter

Built-in in-process queue. No external dependencies. Suitable for single-instance deployments — messages are not shared across engine instances.

yaml
class: modules::queue::BuiltinQueueAdapter
config:
  store_method: file_based   # in_memory | file_based
  file_path: ./data/queue_store  # required when store_method is file_based
<ResponseField name="store_method" type="string"> Persistence strategy: `in_memory` (lost on restart) or `file_based` (durable across restarts). Defaults to `in_memory`. </ResponseField> <ResponseField name="file_path" type="string"> Path to the queue store directory. Required when `store_method` is `file_based`. </ResponseField>

modules::queue::RedisAdapter

Uses Redis as the queue backend for topic-based pub/sub. Enables message distribution across multiple engine instances.

<Warning title="Limited named queue support"> The Redis adapter supports publishing to named queues but does not implement named queue consumption, retries, or dead-letter queues. It is suitable for topic-based pub/sub only. For full named queue support in multi-instance deployments, use the [RabbitMQ adapter](#modulesqueuerabbitmqadapter). </Warning>
yaml
class: modules::queue::RedisAdapter
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}
<ResponseField name="redis_url" type="string"> The URL of the Redis instance to connect to. </ResponseField>

modules::queue::RabbitMQAdapter

Uses RabbitMQ as the queue backend. Supports durable delivery, retries, and dead-letter queues across multiple engine instances.

The engine owns consumer loops, retry acknowledgement, and backoff logic — RabbitMQ is used as a transport only. Retry uses explicit ack + republish to a retry exchange with an x-attempt header, keeping compatibility with both classic and quorum queues.

yaml
class: modules::queue::RabbitMQAdapter
config:
  amqp_url: ${RABBITMQ_URL:amqp://localhost:5672}
<ResponseField name="amqp_url" type="string"> The AMQP URL of the RabbitMQ instance to connect to. </ResponseField>

Queue naming in RabbitMQ

For each named queue defined in queue_configs, iii creates the following RabbitMQ resources:

ResourceFormatExample (payment)
Main exchangeiii.__fn_queue::<name>iii.__fn_queue::payment
Main queueiii.__fn_queue::<name>.queueiii.__fn_queue::payment.queue
Retry exchangeiii.__fn_queue::<name>::retryiii.__fn_queue::payment::retry
Retry queueiii.__fn_queue::<name>::retry.queueiii.__fn_queue::payment::retry.queue
DLQ exchangeiii.__fn_queue::<name>::dlqiii.__fn_queue::payment::dlq
DLQ queueiii.__fn_queue::<name>::dlq.queueiii.__fn_queue::payment::dlq.queue
<Info title="Why so many resources?"> Each named queue creates six RabbitMQ objects to support delayed retry and dead-lettering. For the design rationale, see [Queue Architecture](/architecture/queues). </Info>

Adapter Comparison

BuiltinQueueAdapterRabbitMQAdapterRedisAdapter
RetriesYesYesNo
Dead-letter queueYesYesNo
FIFO orderingYesYesNo
Named queue consumptionYesYesNo (publish only)
Topic-based pub/subYesYesYes
Multi-instanceNoYesYes
External dependencyNoneRabbitMQRedis
<Info title="Choosing an adapter"> For practical guidance on which adapter to use in different deployment scenarios, see [Use Queues — Choosing an Adapter](../how-to/use-queues#choosing-an-adapter). </Info>

Builtin Functions

The queue module registers the following functions automatically when it initializes. These are callable via trigger() from any SDK or via the iii trigger CLI command.

enqueue

Publishes a message to a topic-based queue.

FieldTypeDescription
topicstringThe topic to publish to (required, non-empty)
dataanyThe payload to deliver to subscribers

Returns null on success.

iii::queue::redrive

Moves all messages from a named queue's dead-letter queue back to the main queue. Each message gets its attempt counter reset to zero.

Input:

FieldTypeDescription
queuestringThe named queue whose DLQ should be redriven (required, non-empty)

Output:

FieldTypeDescription
queuestringThe queue name that was redriven
redrivennumberThe number of messages moved back to the main queue

CLI example:

bash
iii trigger \
  --function-id='iii::queue::redrive' \
  --payload='{"queue": "payment"}'
<Info title="DLQ operations"> For a complete guide on inspecting DLQ messages before redriving, see [Manage Failed Triggers](../how-to/dead-letter-queues). </Info>

Queue Flow

mermaid
sequenceDiagram
    participant C as Caller
    participant E as Engine
    participant Q as Queue Adapter
    participant F as Target Function

    C->>E: trigger({ function_id, payload, action: Enqueue({ queue }) })
    E->>E: Validate queue config & FIFO field
    E->>Q: Enqueue job
    Q-->>E: Job accepted
    E-->>C: { messageReceiptId }

    loop Consumer
        Q->>E: Job available
        E->>F: Invoke with payload
        alt Success
            F-->>E: Return value
            E->>Q: Ack
        else Failure — attempt < max_retries
            F-->>E: Error
            E->>Q: Ack + republish with backoff delay
        else Failure — attempts exhausted
            F-->>E: Error
            E->>Q: Route to DLQ
        end
    end