website/src/content/docs/agent-os/queues.mdx
Use actor queues to serialize work that an agent processes one task at a time.
<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";const taskRunner = actor({ queues: { tasks: queue<{ prompt: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["task-agent"]);
for await (const message of c.queue.iter()) {
// Process one task at a time
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(session.sessionId, message.body.prompt);
await agentHandle.closeSession(session.sessionId);
}
}, });
const vm = agentOs({ options: { software: [common, pi] }, });
export const registry = setup({ use: { taskRunner, vm } }); registry.start();
```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.taskRunner.getOrCreate(["main"]);
// Queue up work. Tasks are processed one at a time.
await handle.send("tasks", { prompt: "Review PR #123" });
await handle.send("tasks", { prompt: "Fix the flaky test in auth.test.ts" });
await handle.send("tasks", { prompt: "Update the README" });
Use completable messages when the caller needs to wait for the agent to finish.
<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";const reviewer = actor({ queues: { review: queue<{ file: string }, { summary: string }>(), }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["reviewer"]); const session = await agentHandle.createSession("pi", { env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! }, });
for await (const message of c.queue.iter({ completable: true })) {
const content = await agentHandle.readFile(message.body.file);
const text = new TextDecoder().decode(content);
await agentHandle.sendPrompt(
session.sessionId,
`Review this code and write a summary to /home/user/review.txt:\n\n${text}`,
);
const review = await agentHandle.readFile("/home/user/review.txt");
await message.complete({
summary: new TextDecoder().decode(review),
});
}
}, });
const vm = agentOs({ options: { software: [common, pi] }, });
export const registry = setup({ use: { reviewer, vm } }); registry.start();
```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.reviewer.getOrCreate(["main"]);
// Wait for the agent to complete the review
const result = await handle.send(
"review",
{ file: "/home/user/src/auth.ts" },
{ wait: true, timeout: 120_000 },
);
if (result.status === "completed") {
console.log("Review:", result.response.summary);
}
Accept tasks from webhooks, APIs, or other services and queue them for agent processing.
<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, queue, setup } from "rivetkit";const issueWorker = actor({ queues: { issues: queue<{ title: string; body: string }>(), }, actions: { // HTTP endpoint to receive webhook payloads ingestIssue: async (c, title: string, body: string) => { await c.queue.push("issues", { title, body }); }, }, run: async (c) => { const agentHandle = c.actors.vm.getOrCreate(["issue-worker"]);
for await (const message of c.queue.iter()) {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Investigate and fix this issue:\n\nTitle: ${message.body.title}\n\n${message.body.body}`,
);
await agentHandle.closeSession(session.sessionId);
}
}, });
const vm = agentOs({ options: { software: [common, pi] }, });
export const registry = setup({ use: { issueWorker, vm } }); registry.start();
```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.issueWorker.getOrCreate(["main"]);
// Ingest from a webhook or external system
await handle.ingestIssue(
"Login redirect broken",
"Users are redirected to /undefined after login on mobile",
);