Back to Rivet

Introducing Queues for Rivet Actors

website/src/content/posts/2026-02-25-queues-for-rivet-actors/page.mdx

2.2.114.0 KB
Original Source

Today we're releasing Queues for Rivet Actors: per-actor durable queues with a programmable run handler. Also known as the actor mailbox pattern.

  • Durable and ordered: messages persist through sleep, crashes, and deploys, processed one at a time
  • Handles traffic spikes: absorbs bursts of messages without dropping any
  • Request/response: callers can await a typed response from queued work
  • Programmable run handler: run is a long-lived async function you control, not a callback. Selectively consume named queues, race messages against each other, cancel work mid-flight.
  • Pairs with workflows: use queues as input to durable, replayable workflows
  • Built into the actor: queues, state, SQLite, events, and workflows, all in one place. No external broker to provision.

Show Me The Code

Define typed queues, process them in a run handler, and send messages from a client.

<CodeGroup> ```ts Basic import { actor, queue, setup } from "rivetkit";

const counter = actor({ state: { value: 0 }, // Define typed queues queues: { increment: queue<{ amount: number }>(), }, // Process messages in the run handler run: async (c) => { for await (const message of c.queue.iter()) { c.state.value += message.body.amount; } }, actions: { getValue: (c) => c.state.value, }, });

export const registry = setup({ use: { counter } });


```ts Request/Response
import { actor, queue, setup } from "rivetkit";

const counter = actor({
  state: { value: 0 },
  queues: {
    // Second type parameter is the response type
    increment: queue<{ amount: number }, { value: number }>(),
  },
  run: async (c) => {
    // Enable completable to allow responding to callers
    for await (const message of c.queue.iter({ completable: true })) {
      c.state.value += message.body.amount;
      // Send typed response back to the caller
      await message.complete({ value: c.state.value });
    }
  },
});

export const registry = setup({ use: { counter } });
ts
import { actor, queue, setup } from "rivetkit";

const worker = actor({
  state: { processed: 0 },
  queues: {
    // undefined response type means ack-only, no return data
    process: queue<{ taskId: string }, undefined>(),
  },
  run: async (c) => {
    for await (const message of c.queue.iter({ completable: true })) {
      // Do work
      await processTask(message.body.taskId);
      c.state.processed += 1;
      // Acknowledge completion without returning data
      await message.complete();
    }
  },
});

async function processTask(taskId: string) {
  await fetch(`https://api.example.com/tasks/${taskId}/complete`, {
    method: "POST",
  });
}

export const registry = setup({ use: { worker } });
</CodeGroup>

Sending messages from a client:

ts
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";

const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);

// Fire-and-forget
await handle.send("increment", { amount: 1 });

// Wait for a typed response
const result = await handle.send(
  "increment",
  { amount: 5 },
  { wait: true, timeout: 5_000 },
);

if (result.status === "completed") {
  console.log(result.response); // { value: 6 }
} else if (result.status === "timedOut") {
  console.log("timed out");
}

The Run Handler

The run handler is the heart of an actor. It's a long-lived async function that owns the actor's main processing. Instead of registering callbacks, you write it yourself: iterate queues, sleep between ticks, race signals against each other. You control exactly how and when messages are consumed.

<CodeGroup> ```ts Message Loop import { actor, queue, setup } from "rivetkit";

const worker = actor({ state: { processed: 0 }, queues: { jobs: queue<{ url: string }>(), }, // Iterate messages as they arrive run: async (c) => { for await (const message of c.queue.iter()) { await fetch(message.body.url, { method: "POST" }); c.state.processed += 1; } }, });

export const registry = setup({ use: { worker } });


```ts Tick Loop
import { actor, setup } from "rivetkit";
import { interval } from "rivetkit/utils";

const gameRoom = actor({
  state: {
    tick: 0,
    players: {} as Record<string, { x: number; y: number }>,
  },
  // Fixed-interval game loop
  run: async (c) => {
    const tick = interval(100); // 10 ticks per second
    while (!c.aborted) {
      await tick();
      if (c.aborted) break;

      c.state.tick += 1;

      // Update physics, check collisions, etc.
      for (const player of Object.values(c.state.players)) {
        player.x = Math.max(0, Math.min(1000, player.x));
        player.y = Math.max(0, Math.min(1000, player.y));
      }

      c.broadcast("snapshot", c.state);
    }
  },
  actions: {
    setInput: (c, input: { x: number; y: number }) => {
      c.state.players[c.conn.id] = input;
    },
  },
});

export const registry = setup({ use: { gameRoom } });
ts
import { openai } from "@ai-sdk/openai";
import { generateText } from "ai";
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";

const agent = actor({
  state: { running: false, messages: [] as string[] },
  queues: {
    // Separate queues for different message types
    prompt: queue<{ prompt: string }, undefined>(),
    stop: queue<{ reason?: string }>(),
  },
  run: async (c) => {
    // Only consume from the prompt queue
    for await (const promptMessage of c.queue.iter({
      names: ["prompt"],
      completable: true,
    })) {
      const stopController = new AbortController();
      const runSignal = joinSignals(c.abortSignal, stopController.signal);

      // Race: watch for stop messages while generating
      c.queue
        .next({ names: ["stop"], signal: runSignal })
        .then((stopMessage) => {
          if (stopMessage) stopController.abort();
        })
        .catch(() => {});

      // Generate until complete or cancelled
      c.state.running = true;
      const { text } = await generateText({
        model: openai("gpt-5"),
        prompt: promptMessage.body.prompt,
        abortSignal: runSignal,
      }).finally(() => {
        stopController.abort();
        c.state.running = false;
      });

      c.state.messages.push(text);
      await promptMessage.complete();
    }
  },
});

export const registry = setup({ use: { agent } });
ts
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";

const worker = actor({
  state: {},
  createVars: () => ({
    cancelController: new AbortController(),
  }),
  queues: {
    jobs: queue<{ id: string }>(),
  },
  actions: {
    // Cancel processing from outside
    cancelProcessing: async (c) => {
      c.vars.cancelController.abort();
    },
  },
  run: async (c) => {
    while (!c.aborted) {
      // Combine actor shutdown signal with custom cancel signal
      const signal = joinSignals(
        c.abortSignal,
        c.vars.cancelController.signal,
      );

      try {
        const message = await c.queue.next({ signal });
        if (!message) continue;
        console.log("Processing job", message.body.id);
      } catch (error) {
        // Reset cancel controller if it was a manual cancel
        if (c.vars.cancelController.signal.aborted && !c.aborted) {
          c.vars.cancelController = new AbortController();
          continue;
        }
        throw error;
      }
    }
  },
});

export const registry = setup({ use: { worker } });
</CodeGroup>

Queues for Agents

Queues are a natural fit for AI agents. Use a prompt queue for incoming messages, a stop queue for cancellation, and SQLite for persistent chat history. The run handler processes messages durably, so the agent survives crashes and picks up where it left off.

<CodeGroup> ```ts Simple Agent import { actor, queue, setup } from "rivetkit"; import { db } from "rivetkit/db"; import { generateText, tool } from "ai"; import { openai } from "@ai-sdk/openai"; import { z } from "zod";

const agent = actor({ // SQLite for persistent chat history db: db({ onMigrate: async (db) => { await db.execute( CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, role TEXT NOT NULL, content TEXT NOT NULL ) ); }, }), queues: { prompt: queue<{ content: string }>(), }, run: async (c) => { for await (const message of c.queue.iter()) { // Save user message to SQLite await c.db.execute( "INSERT INTO messages (role, content) VALUES (?, ?)", "user", message.body.content, );

  // Load full chat history
  const history = (await c.db.execute(
    "SELECT role, content FROM messages ORDER BY id",
  )) as { role: string; content: string }[];

  // Generate with tool use
  const result = await generateText({
    model: openai("gpt-5"),
    messages: history.map((row) => ({
      role: row.role as "user" | "assistant",
      content: row.content,
    })),
    tools: {
      getWeather: tool({
        description: "Get the weather for a location",
        parameters: z.object({ location: z.string() }),
        execute: async ({ location }) => `72F in ${location}`,
      }),
    },
    maxSteps: 5,
  });

  // Save assistant response
  await c.db.execute(
    "INSERT INTO messages (role, content) VALUES (?, ?)",
    "assistant",
    result.text,
  );
}

}, });

export const registry = setup({ use: { agent } });


```ts Cancellable Agent
import { actor, queue, setup } from "rivetkit";
import { db } from "rivetkit/db";
import { generateText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { joinSignals } from "rivetkit/utils";
import { z } from "zod";

const agent = actor({
  db: db({
    onMigrate: async (db) => {
      await db.execute(`
        CREATE TABLE IF NOT EXISTS messages (
          id INTEGER PRIMARY KEY AUTOINCREMENT,
          role TEXT NOT NULL,
          content TEXT NOT NULL
        )
      `);
    },
  }),
  queues: {
    // Separate queues for prompts and stop signals
    prompt: queue<{ content: string }, undefined>(),
    stop: queue<{ reason?: string }>(),
  },
  run: async (c) => {
    for await (const message of c.queue.iter({
      names: ["prompt"],
      completable: true,
    })) {
      const stopController = new AbortController();
      const runSignal = joinSignals(c.abortSignal, stopController.signal);

      // Cancel generation if a stop message arrives
      c.queue
        .next({ names: ["stop"], signal: runSignal })
        .then((stopMessage) => {
          if (stopMessage) stopController.abort();
        })
        .catch(() => {});

      // Save user message to SQLite
      await c.db.execute(
        "INSERT INTO messages (role, content) VALUES (?, ?)",
        "user",
        message.body.content,
      );

      // Load full chat history
      const history = (await c.db.execute(
        "SELECT role, content FROM messages ORDER BY id",
      )) as { role: string; content: string }[];

      // Generate with tool use
      const result = await generateText({
        model: openai("gpt-5"),
        messages: history.map((row) => ({
          role: row.role as "user" | "assistant",
          content: row.content,
        })),
        tools: {
          getWeather: tool({
            description: "Get the weather for a location",
            parameters: z.object({ location: z.string() }),
            execute: async ({ location }) => `72F in ${location}`,
          }),
        },
        maxSteps: 5,
        abortSignal: runSignal,
      }).finally(() => {
        stopController.abort();
      });

      // Save assistant response
      await c.db.execute(
        "INSERT INTO messages (role, content) VALUES (?, ?)",
        "assistant",
        result.text,
      );

      await message.complete();
    }
  },
});

export const registry = setup({ use: { agent } });
</CodeGroup>

Request/Response

Three delivery modes depending on what the caller needs:

  • Fire-and-forget: send and move on
  • Completable: send and wait for acknowledgment
  • Request/response: send and await a typed reply

Pairs with Workflows

Feed queue messages into durable workflows. Each workflow step is checkpointed, so crashes pick up where they left off. Combine queues with sleep, join, race, rollback, and human-in-the-loop patterns.

ts
import { actor, queue, setup } from "rivetkit";
import { workflow } from "rivetkit/workflow";

const worker = actor({
  state: { processed: 0 },
  queues: {
    orders: queue<{ orderId: string }>(),
  },
  // Workflow replays safely on crash or restart
  run: workflow(async (ctx) => {
    for await (const message of ctx.queue.iter()) {
      await ctx.step("charge", async () =>
        charge(message.body.orderId),
      );
      await ctx.step("fulfill", async () =>
        fulfill(message.body.orderId),
      );
      await ctx.step("notify", async () =>
        notify(message.body.orderId),
      );
    }
  }),
});

async function charge(orderId: string) { /* ... */ }
async function fulfill(orderId: string) { /* ... */ }
async function notify(orderId: string) { /* ... */ }

export const registry = setup({ use: { worker } });

Built into the Actor

Queues are part of the actor, not a separate service. The same actor has state, SQLite, events, and workflows, all built in. No external broker to provision, no connection strings, no infrastructure to manage.

Plus everything else that comes with Rivet Actors: scale to millions of instances, scale to zero, TypeScript-native, deploy on Cloudflare Workers, Vercel, Railway, or your own infra.

Get Started

Queues are available today in RivetKit.

bash
npm install rivetkit
ts
import { queue } from "rivetkit";