website/src/content/docs/actors/queues.mdx
Queues are commonly referred to as "mailboxes" in other actor frameworks.
This is the default pattern. Define queue names in queues, process them in run, and publish from the client with handle.send(...).
export const counter = actor({ state: { value: 0 }, queues: { increment: queue<{ amount: number }>(), }, run: async (c) => { for await (const message of c.queue.iter()) { c.state.value += message.body.amount; } }, });
export const registry = setup({ use: { counter } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.counter.getOrCreate(["main"]);
await handle.send("increment", { amount: 1 });
await handle.send("increment", { amount: 5 });
Use this when you want explicit completion/ack semantics but do not need to return data.
message.complete(), the message is not acknowledged.status: "timedOut" means sender timeout elapsed before message.complete(...).export const counter = actor({ state: { value: 0 }, queues: { increment: queue<{ amount: number }, undefined>(), }, run: async (c) => { for await (const message of c.queue.iter({ completable: true })) { c.state.value += message.body.amount; await message.complete(); } }, });
export const registry = setup({ use: { counter } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.counter.getOrCreate(["main"]);
const result = await handle.send(
"increment",
{ amount: 5 },
{ wait: true, timeout: 5_000 },
);
if (result.status === "completed") {
console.log("applied");
} else if (result.status === "timedOut") {
console.log("timed out");
}
Use this when the sender needs data back from queued work.
<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit";export const counter = actor({ state: { value: 0 }, queues: { increment: queue<{ amount: number }, { value: number }>(), }, run: async (c) => { for await (const message of c.queue.iter({ completable: true })) { c.state.value += message.body.amount; await message.complete({ value: c.state.value }); } }, });
export const registry = setup({ use: { counter } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.counter.getOrCreate(["main"]);
const result = await handle.send(
"increment",
{ amount: 5 },
{ wait: true, timeout: 5_000 },
);
if (result.status === "completed") {
console.log(result.response); // { value: 5 }
} else if (result.status === "timedOut") {
console.log("timed out");
}
Queueing is useful from inside actor logic too, not just from clients.
c.queue.send(...) from other parts of run when needed.c.queue.send(...) confirms durable enqueue. It does not wait for processing to finish.export const counter = actor({ state: { value: 0 }, queues: { mutate: queue<{ delta: number }>(), }, run: async (c) => { for await (const message of c.queue.iter()) { c.state.value += message.body.delta; } }, actions: { increment: async (c, delta: number) => { await c.queue.send("mutate", { delta }); }, }, });
export const registry = setup({ use: { counter } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.counter.getOrCreate(["main"]);
await handle.increment(5);
await handle.increment(2);
You can define queue types with queue<TMessage, TComplete>() or with schema objects. Schema objects support Standard Schema validators, including Zod.
import { actor, queue, setup } from "rivetkit";
import { z } from "zod";
export const worker = actor({
state: {},
queues: {
// Use generic queue typing when you want compile-time typing only.
foo: queue<{ id: string }, { ok: true }>(),
// Use schema objects when you want runtime validation for message and completion payloads.
bar: {
message: z.object({ id: z.string() }),
complete: z.object({ ok: z.boolean() }),
},
},
});
export const registry = setup({ use: { worker } });
next and nextBatchUse next when you want to wait for one queue message.
Use nextBatch when you want to wait for multiple queue messages.
timeout to wait indefinitely.import { actor, queue, setup } from "rivetkit";
export const queueWorker = actor({
state: {},
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
pull: async (c) => {
const batch = await c.queue.nextBatch({
count: 10,
timeout: 1_000,
});
const oneWithoutTimeout = await c.queue.next();
return {
batchCount: batch.length,
receivedOneWithoutTimeout: oneWithoutTimeout !== undefined,
};
},
},
});
export const registry = setup({ use: { queueWorker } });
Use tryNext when you need one non-blocking read.
Use tryNextBatch for non-blocking batch reads.
import { actor, queue, setup } from "rivetkit";
export const queueWorker = actor({
state: {},
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
poll: async (c) => {
const immediate = await c.queue.tryNext();
const immediateBatch = await c.queue.tryNextBatch({
count: 10,
});
return {
hasImmediate: immediate !== undefined,
immediateBatchCount: immediateBatch.length,
};
},
},
});
export const registry = setup({ use: { queueWorker } });
Use signal when your receive loop needs external cancellation semantics in addition to actor shutdown behavior.
export const signalWorker = actor({ state: {}, createVars: () => ({ cancelController: new AbortController(), }), queues: { jobs: queue<{ id: string }>(), }, actions: { cancelProcessing: async (c) => { c.vars.cancelController.abort(); }, }, run: async (c) => { while (!c.aborted) { const signal = joinSignals(c.abortSignal, c.vars.cancelController.signal);
try {
const message = await c.queue.next({ signal });
if (!message) continue;
console.log("Processing job", message.body.id);
} catch (error) {
if (c.vars.cancelController.signal.aborted && !c.aborted) {
c.vars.cancelController = new AbortController();
continue;
}
throw error;
}
}
}, });
export const registry = setup({ use: { signalWorker } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.signalWorker.getOrCreate(["main"]);
await handle.send("jobs", { id: "job-1" });
await handle.cancelProcessing();
Multiple queues let you separate message flows by purpose. By default, receive calls race across all queues when names is not specified. In this pattern, prompt messages run through a streaming loop while stop messages act as control signals on a separate receive path.
Use iter({ names: ["prompt"] }) as the main stream and next({ names: ["stop"] }) as a stop signal.
export const agent = actor({ state: { running: false, messages: [] as string[] }, queues: { prompt: queue<{ prompt: string }, undefined>(), stop: queue<{ reason?: string }>(), }, run: async (c) => { for await (const promptMessage of c.queue.iter({ names: ["prompt"], completable: true })) { // Create a stop controller for this prompt run. const stopController = new AbortController(); const runSignal = joinSignals(c.abortSignal, stopController.signal);
// Watch for stop messages while generation is running.
const stopWatcher = c.queue
.next({ names: ["stop"], signal: runSignal })
.then((stopMessage) => {
if (stopMessage) stopController.abort();
})
.catch(() => {});
// Generate a response for the prompt.
c.state.running = true;
const { text } = await generateText({
model: openai("gpt-5"),
prompt: promptMessage.body.prompt,
abortSignal: runSignal,
}).finally(async () => {
stopController.abort();
c.state.running = false;
});
// Append each model response to actor state and acknowledge the prompt.
c.state.messages.push(text);
await promptMessage.complete();
}
}, });
export const registry = setup({ use: { agent } });
```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.agent.getOrCreate(["main"]);
await handle.send("prompt", { prompt: "summarize latest logs" });
await handle.send("stop", { reason: "user canceled" });
If an actor has a run handler, it does not sleep while that handler is actively doing work. It only can sleep when the run loop is blocked waiting for queue entries (for example inside iter(...) or next(...)).
This means you can run normal code in run without worrying about sleep interrupting it mid-call.
GET /inspector/queue?limit=50 returns queue size and pending message metadata.GET /inspector/summary includes queueSize for quick queue health checks.POST /queue/:name with wait: true is useful to verify completable/request-response behavior.onBeforeConnect. See Authentication.c.aborted and c.abortSignal for graceful shutdown and cancellation.timeout when callers need bounded wait behavior.wait: true only when the caller actually needs a response.wait: true between actorswait: true blocks the sender's run loop until the receiver finishes. Between actors, this adds unnecessary overhead and risks deadlocks, especially if the target actor needs to communicate back. If an actor sends a wait: true message to itself, it is a guaranteed deadlock because the run loop is already busy processing the current message.
Reserve wait: true for external callers (HTTP handlers, CLI tools, client apps). For actor-to-actor communication, send a queue message to the other actor without wait: true, then have that actor send a queue message back when the work is done.
Every queue message includes a createdAt timestamp. Use this to skip or discard stale messages in your run loop:
import { actor, queue, setup } from "rivetkit";
export const worker = actor({
state: {},
queues: {
jobs: queue<{ task: string }>(),
},
run: async (c) => {
for await (const message of c.queue.iter()) {
const ageMs = Date.now() - message.createdAt;
if (ageMs > 60_000) {
// Message is older than 60 seconds, skip it.
continue;
}
console.log("Processing", message.body.task);
}
},
});
export const registry = setup({ use: { worker } });
Use c.schedule to enqueue messages at a future time instead of processing them immediately:
import { actor, queue, setup } from "rivetkit";
export const reminder = actor({
state: {},
queues: {
notify: queue<{ userId: string }>(),
},
actions: {
scheduleReminder: async (c, userId: string) => {
// Enqueue a message in 30 seconds.
c.schedule.after(30_000, "enqueueReminder", userId);
},
enqueueReminder: async (c, userId: string) => {
await c.queue.send("notify", { userId });
},
},
run: async (c) => {
for await (const message of c.queue.iter()) {
console.log("Sending reminder to", message.body.userId);
}
},
});
export const registry = setup({ use: { reminder } });
See Schedule for the full scheduling API.