Back to Rivet

Queues

website/src/content/docs/agent-os/queues.mdx

2.2.15.7 KB
Original Source
  • Serial execution ensures agents process one task at a time
  • Durable messages survive sleep and restart
  • Completable messages for request/response patterns with agents
  • Backpressure absorbs bursts and prevents overload

Queue agent commands

Use actor queues to serialize work that an agent processes one task at a time.

<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";

const taskRunner = actor({ queues: { tasks: queue<{ prompt: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["task-agent"]);

for await (const message of c.queue.iter()) {
  // Process one task at a time
  const session = await agentHandle.createSession("pi", {
    env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
  });
  await agentHandle.sendPrompt(session.sessionId, message.body.prompt);
  await agentHandle.closeSession(session.sessionId);
}

}, });

const vm = agentOs({ options: { software: [common, pi] }, });

export const registry = setup({ use: { taskRunner, vm } }); registry.start();


```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.taskRunner.getOrCreate(["main"]);

// Queue up work. Tasks are processed one at a time.
await handle.send("tasks", { prompt: "Review PR #123" });
await handle.send("tasks", { prompt: "Fix the flaky test in auth.test.ts" });
await handle.send("tasks", { prompt: "Update the README" });
</CodeGroup>

Request/response with completable messages

Use completable messages when the caller needs to wait for the agent to finish.

<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";

const reviewer = actor({ queues: { review: queue<{ file: string }, { summary: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["reviewer"]); const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, });

for await (const message of c.queue.iter({ completable: true })) {
  const content = await agentHandle.readFile(message.body.file);
  const text = new TextDecoder().decode(content);

  await agentHandle.sendPrompt(
    session.sessionId,
    `Review this code and write a summary to /home/user/review.txt:\n\n${text}`,
  );

  const review = await agentHandle.readFile("/home/user/review.txt");
  await message.complete({
    summary: new TextDecoder().decode(review),
  });
}

}, });

const vm = agentOs({ options: { software: [common, pi] }, });

export const registry = setup({ use: { reviewer, vm } }); registry.start();


```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.reviewer.getOrCreate(["main"]);

// Wait for the agent to complete the review
const result = await handle.send(
  "review",
  { file: "/home/user/src/auth.ts" },
  { wait: true, timeout: 120_000 },
);

if (result.status === "completed") {
  console.log("Review:", result.response.summary);
}
</CodeGroup>

Ingesting from external systems

Accept tasks from webhooks, APIs, or other services and queue them for agent processing.

<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";

const issueWorker = actor({ queues: { issues: queue<{ title: string; body: string }>(), }, actions: { // HTTP endpoint to receive webhook payloads ingestIssue: async (c, title: string, body: string) => { await c.queue.push("issues", { title, body }); }, }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["issue-worker"]);

for await (const message of c.queue.iter()) {
  const session = await agentHandle.createSession("pi", {
    env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
  });
  await agentHandle.sendPrompt(
    session.sessionId,
    `Investigate and fix this issue:\n\nTitle: ${message.body.title}\n\n${message.body.body}`,
  );
  await agentHandle.closeSession(session.sessionId);
}

}, });

const vm = agentOs({ options: { software: [common, pi] }, });

export const registry = setup({ use: { issueWorker, vm } }); registry.start();


```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.issueWorker.getOrCreate(["main"]);

// Ingest from a webhook or external system
await handle.ingestIssue(
  "Login redirect broken",
  "Users are redirected to /undefined after login on mobile",
);
</CodeGroup>

Recommendations

  • Use queues when you need guaranteed serial execution. Agents process one message at a time, preventing race conditions.
  • Use completable messages when the caller needs the result. Set a generous timeout since agent work can take minutes.
  • Queues survive actor sleep. Messages are persisted and processed when the actor wakes up.
  • See Queues & Run Loops for the full queue API reference.