Back to Rivet

Workflow Automation

website/src/content/docs/agent-os/workflows.mdx

2.2.15.3 KB
Original Source
  • Durable workflows that survive crashes and restarts
  • Multi-step orchestration with sessions, file operations, and process execution
  • Error handling and retry via ctx.step() for each operation
  • Agent chaining where output of one session feeds into the next

Basic workflow

Use the actor workflow() primitive to orchestrate a multi-step agent task. Each step is durable and will resume from where it left off after a restart.

Session creation and prompting must happen within the same step because sessions are ephemeral and won't survive a replay.

<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, setup, workflow } from "rivetkit";

const automator = actor({ workflows: { fixBug: workflow<{ repo: string; issue: string }>(), }, run: async (c) => { for await (const message of c.workflow.iter("fixBug")) { const { repo, issue } = message.body; const agentHandle = c.actors.vm.getOrCreate([fix-${issue}]);

  // Step 1: Clone the repo
  await c.step("clone-repo", async () => {
    return agentHandle.exec(`git clone ${repo} /home/user/repo`);
  });

  // Step 2: Agent fixes the bug (session lives within this step)
  await c.step("fix-bug", async () => {
    const session = await agentHandle.createSession("pi", {
      env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
    });
    await agentHandle.sendPrompt(
      session.sessionId,
      `Fix the bug described in issue: ${issue}`,
    );
    await agentHandle.closeSession(session.sessionId);
  });

  // Step 3: Run tests
  const tests = await c.step("run-tests", async () => {
    return agentHandle.exec("cd /home/user/repo && npm test");
  });

  console.log("Tests exit code:", tests.exitCode);
  await message.complete();
}

}, });

const vm = agentOs({ options: { software: [common, pi] }, });

export const registry = setup({ use: { automator, vm } }); registry.start();


```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.automator.getOrCreate(["main"]);

// Trigger the workflow
await handle.send("fixBug", {
  repo: "https://github.com/example/repo.git",
  issue: "Fix the login redirect bug",
});
</CodeGroup>

Agent chaining

Output of one agent session feeds into the next. Each session is created and completed within its own step.

<CodeGroup stacked> ```ts @nocheck server.ts import { agentOs } from "rivetkit/agent-os"; import common from "@rivet-dev/agent-os-common"; import pi from "@rivet-dev/agent-os-pi"; import { actor, setup, workflow } from "rivetkit";

const pipeline = actor({ workflows: { codeReview: workflow<{ filePath: string }>(), }, run: async (c) => { for await (const message of c.workflow.iter("codeReview")) { const agentHandle = c.actors.vm.getOrCreate([review-${Date.now()}]);

  // Step 1: Agent reviews code and writes findings to a file
  await c.step("review", async () => {
    const session = await agentHandle.createSession("pi", {
      env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
    });
    await agentHandle.sendPrompt(
      session.sessionId,
      `Review the code at ${message.body.filePath} and write your findings to /home/user/review.md`,
    );
    await agentHandle.closeSession(session.sessionId);
  });

  // Step 2: Read the review from the filesystem
  const review = await c.step("read-review", async () => {
    const content = await agentHandle.readFile("/home/user/review.md");
    return new TextDecoder().decode(content);
  });

  // Step 3: Second session applies fixes based on the review
  await c.step("fix", async () => {
    const session = await agentHandle.createSession("pi", {
      env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
    });
    await agentHandle.sendPrompt(
      session.sessionId,
      `Apply the following review feedback:\n\n${review}`,
    );
    await agentHandle.closeSession(session.sessionId);
  });

  await message.complete();
}

}, });

const vm = agentOs({ options: { software: [common, pi] }, });

export const registry = setup({ use: { pipeline, vm } }); registry.start();


```ts @nocheck client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./server";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.pipeline.getOrCreate(["main"]);

await handle.send("codeReview", { filePath: "/home/user/src/auth.ts" });
</CodeGroup>

Recommendations

  • Create and close sessions within the same step. Sessions are ephemeral and won't exist after a workflow replays.
  • Pass data between steps via the filesystem or step return values, not session state.
  • Keep step names stable across code changes. Renaming a step breaks replay for in-progress workflows.
  • Use separate actors for the workflow orchestrator and the agentOS VM.
  • See Workflows for the full workflow API reference including timers, joins, and races.