Back to Rivet

Workflows

website/src/content/docs/actors/workflows.mdx

2.2.155.1 KB
Original Source

Use workflows for durable, multi-step execution with replay safety.

What are workflows?

A workflow is a durable, replayable run handler for a Rivet Actor.

  • Survives restarts: workflow progress is saved automatically.
  • Re-runs safely: replay follows the same recorded steps.
  • Event-driven: workflows can pause for queue messages, then continue.

Getting started

Simple workflow

Use this when you need a short multi-step sequence.

<CodeGroup workspace> ```ts index.ts import { actor, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const invoiceActor = actor({ state: { invoiceId: null as string | null, subtotal: 0, tax: 0, total: 0, status: "idle" as "idle" | "complete", }, run: workflow(async (ctx) => { const subtotal = await ctx.step("load-subtotal", async () => loadSubtotal(), );

const tax = await ctx.step("calculate-tax", async () =>
  calculateTax(subtotal),
);

await ctx.step("save-invoice", async () =>
  saveInvoice(ctx, subtotal, tax),
);

}), actions: { getState: (c) => c.state, }, });

async function loadSubtotal(): Promise<number> { const response = await fetch("https://api.example.com/carts/main"); if (!response.ok) { throw new Error(load subtotal failed: ${response.status}); } const cart = (await response.json()) as { subtotal: number }; return cart.subtotal; }

async function calculateTax(subtotal: number): Promise<number> { const response = await fetch("https://api.example.com/tax/quote", { method: "POST", headers: { "content-type": "application/json", }, body: JSON.stringify({ subtotal }), }); if (!response.ok) { throw new Error(tax quote failed: ${response.status}); } const quote = (await response.json()) as { tax: number }; return quote.tax; }

async function saveInvoice( ctx: WorkflowContextOf<typeof invoiceActor>, subtotal: number, tax: number, ): Promise<void> { const total = subtotal + tax; const response = await fetch("https://api.example.com/invoices", { method: "POST", headers: { "content-type": "application/json", }, body: JSON.stringify({ subtotal, tax, total }), }); if (!response.ok) { throw new Error(save invoice failed: ${response.status}); } const invoice = (await response.json()) as { id: string }; ctx.state.invoiceId = invoice.id; ctx.state.subtotal = subtotal; ctx.state.tax = tax; ctx.state.total = total; ctx.state.status = "complete"; }

export const registry = setup({ use: { invoiceActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.invoiceActor.getOrCreate(["main"]);

const state = await handle.getState();
console.log(state.status, state.total);
</CodeGroup>

Loops

This is the recommended workflow shape for most actor workloads.

  • Use a queue wait inside the loop to receive the next unit of work.
  • Keep actor state changes in a single workflow loop.
  • This gives you one durable workflow that manages all actor progress.
<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const workflowCounter = actor({ state: { value: 0, processed: 0, lastOperationId: null as string | null, }, queues: { counter: queue<{ delta: number }>(), }, run: workflow(async (ctx) => { await ctx.loop("counter-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-counter-command");

    await loopCtx.step("apply-counter-command", async () =>
      applyCounterCommand(loopCtx, message.body.delta),
    );

  });

}), actions: { getState: (c) => c.state, }, });

async function applyCounterCommand( ctx: WorkflowLoopContextOf<typeof workflowCounter>, delta: number, ): Promise<void> { const response = await fetch("https://api.example.com/counter/apply", { method: "POST", headers: { "content-type": "application/json", }, body: JSON.stringify({ delta }), }); if (!response.ok) { throw new Error(counter apply failed: ${response.status}); } const result = (await response.json()) as { nextValue: number; operationId: string; }; ctx.state.value = result.nextValue; ctx.state.lastOperationId = result.operationId; ctx.state.processed += 1; }

export const registry = setup({ use: { workflowCounter } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.workflowCounter.getOrCreate(["main"]);

await handle.send("counter", { delta: 1 });
await handle.send("counter", { delta: 2 });

const state = await handle.getState();
console.log(state.value, state.processed);
</CodeGroup>

Setup & teardown

Use this when the workflow should initialize resources, process queued commands, then clean up.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { Loop, type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type WorkMessage = { amount: number }; type ControlMessage = { type: "stop"; reason: string };

export const setupRunTeardownActor = actor({ state: { phase: "idle" as "idle" | "running" | "stopped", total: 0, processed: 0, stopReason: null as string | null, workerSessionId: null as string | null, }, queues: { work: queue<WorkMessage>(), control: queue<ControlMessage>(), }, run: workflow(async (ctx) => { await ctx.step("setup", async () => setupWorkerSession(ctx));

const stopReason = await ctx.loop("worker-loop", async (loopCtx) => {
    const message = await loopCtx.queue.next("wait-command", {
      names: ["work", "control"],
    });

    if (message.name === "work") {
      const work = message.body as WorkMessage;
      await loopCtx.step("apply-work", async () =>
        applyWorkerMessage(loopCtx, work),
      );
      return;
    }

    const control = message.body as ControlMessage;
    if (control.type === "stop") {
      return Loop.break(control.reason);
    }

  });

await ctx.step("teardown", async () =>
  teardownWorkerSession(ctx, stopReason),
);

}), actions: { getState: (c) => c.state, }, });

async function setupWorkerSession( ctx: WorkflowContextOf<typeof setupRunTeardownActor>, ): Promise<void> { const response = await fetch("https://api.example.com/workers/session", { method: "POST", }); if (!response.ok) { throw new Error(worker setup failed: ${response.status}); } const session = (await response.json()) as { sessionId: string }; ctx.state.workerSessionId = session.sessionId; ctx.state.phase = "running"; ctx.state.stopReason = null; }

async function applyWorkerMessage( ctx: WorkflowLoopContextOf<typeof setupRunTeardownActor>, work: WorkMessage, ): Promise<void> { const response = await fetch("https://api.example.com/workers/process", { method: "POST", headers: { "content-type": "application/json", }, body: JSON.stringify({ sessionId: ctx.state.workerSessionId, amount: work.amount, }), }); if (!response.ok) { throw new Error(worker process failed: ${response.status}); } const result = (await response.json()) as { appliedAmount: number }; ctx.state.total += result.appliedAmount; ctx.state.processed += 1; }

async function teardownWorkerSession( ctx: WorkflowContextOf<typeof setupRunTeardownActor>, stopReason: string, ): Promise<void> { if (ctx.state.workerSessionId) { const response = await fetch( https://api.example.com/workers/session/${ctx.state.workerSessionId}, { method: "DELETE" }, ); if (!response.ok) { throw new Error(worker teardown failed: ${response.status}); } } ctx.state.phase = "stopped"; ctx.state.stopReason = stopReason; }

export const registry = setup({ use: { setupRunTeardownActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.setupRunTeardownActor.getOrCreate(["main"]);

await handle.send("work", { amount: 5 });
await handle.send("work", { amount: 3 });
await handle.send("control", { type: "stop", reason: "maintenance" });

const state = await handle.getState();
console.log(state.phase, state.total, state.stopReason);
</CodeGroup>

Features

Queue

Use this for fire-and-forget commands where the client does not need a reply.

Use the Loops example above as the baseline pattern.

Request/response (using queue)

Use this when the caller needs a response from queued processing.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const requestResponseActor = actor({ state: { handled: 0, }, queues: { requests: queue<{ value: number }, { doubled: number }>(), }, run: workflow(async (ctx) => { await ctx.loop("request-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-request", { completable: true, });

    if (!message.complete) return;

    const doubled = await loopCtx.step("handle-request", async () => {
      loopCtx.state.handled += 1;
      return message.body.value * 2;
    });

    await message.complete({ doubled });
  });

}), });

export const registry = setup({ use: { requestResponseActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.requestResponseActor.getOrCreate(["main"]);

const result = await handle.send(
  "requests",
  { value: 21 },
  { wait: true, timeout: 1_000 },
);

if (result.status === "completed") {
  const response = result.response as { doubled: number };
  console.log(response.doubled);
}
</CodeGroup>

Timers

Use queue messages as the trigger source, then sleep durably inside the workflow.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type Reminder = { text: string; at: number; };

export const reminderActor = actor({ state: { fired: [] as string[], }, queues: { reminders: queue<Reminder>(), }, run: workflow(async (ctx) => { await ctx.loop("reminder-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-reminder");

    const runAt = Math.max(Date.now(), message.body.at);
    await loopCtx.sleepUntil("wait-until-reminder", runAt);

    await loopCtx.step("record-reminder", async () => {
      loopCtx.state.fired.push(message.body.text);
    });

  });

}), actions: { getState: (c) => c.state, }, });

export const registry = setup({ use: { reminderActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.reminderActor.getOrCreate(["main"]);

await handle.send("reminders", {
  text: "send weekly report",
  at: Date.now() + 1_000,
});

await new Promise((resolve) => setTimeout(resolve, 1_300));
console.log(await handle.getState());
</CodeGroup>

Join

Use join when several independent tasks can run in parallel.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const dashboardActor = actor({ state: { summary: null as null | { users: number; orders: number; revenue: number; }, }, queues: { refresh: queue<Record<string, never>>(), }, run: workflow(async (ctx) => { await ctx.loop("dashboard-loop", async (loopCtx) => { await loopCtx.queue.next("wait-refresh");

    const summary = await loopCtx.join("fetch-summary", {
      users: {
        run: async (branchCtx) => {
          return await branchCtx.step("fetch-users", () => fetchCount("/users"));
        },
      },
      orders: {
        run: async (branchCtx) => {
          return await branchCtx.step("fetch-orders", () => fetchCount("/orders"));
        },
      },
      revenue: {
        run: async (branchCtx) => {
          return await branchCtx.step("fetch-revenue", () => fetchCount("/revenue"));
        },
      },
    });

    await loopCtx.step("save-summary", async () => {
      loopCtx.state.summary = summary;
    });

  });

}), actions: { getState: (c) => c.state, }, });

async function fetchCount(path: string): Promise<number> { const res = await fetch(https://api.example.com${path}); if (!res.ok) throw new Error(fetch ${path} failed: ${res.status}); return ((await res.json()) as { count: number }).count; }

export const registry = setup({ use: { dashboardActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.dashboardActor.getOrCreate(["main"]);

await handle.send("refresh", {});
console.log(await handle.getState());
</CodeGroup>

Race

Use race when you need first-winner behavior.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const auctionActor = actor({ state: { result: null as "sold" | "expired" | null }, queues: { bids: queue<{ amount: number }>(), }, run: workflow(async (ctx) => { await ctx.step("list-item", () => listItem("item-123"));

const { winner } = await ctx.race("bid-or-expire", [
  {
    name: "bid",
    run: async (branchCtx) => {
      const bid = await branchCtx.queue.next("wait-bid");
      return bid.body.amount;
    },
  },
  {
    name: "expire",
    run: async (branchCtx) => {
      await branchCtx.sleep("auction-timeout", 24 * 60 * 60 * 1000);
      return 0;
    },
  },
]);

await ctx.step("finalize", async () => {
  await finalizeAuction("item-123", winner);
  ctx.state.result = winner === "bid" ? "sold" : "expired";
});

}), actions: { getState: (c) => c.state, }, });

async function listItem(itemId: string): Promise<void> { await fetch(https://api.example.com/auctions/${itemId}, { method: "POST", }); }

async function finalizeAuction( itemId: string, outcome: string, ): Promise<void> { await fetch(https://api.example.com/auctions/${itemId}/finalize, { method: "POST", body: JSON.stringify({ outcome }), }); }

export const registry = setup({ use: { auctionActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.auctionActor.getOrCreate(["item-123"]);

await handle.send("bids", { amount: 100 });
console.log(await handle.getState());
</CodeGroup>

Timeouts

Use step timeouts and retries for slow or flaky dependencies.

ts
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

async function chargeCard(orderId: string): Promise<string> {
  return `charge-${orderId}`;
}

export const timeoutActor = actor({
  state: {
    lastChargeId: null as string | null,
  },
  queues: {
    charge: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop("charge-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-charge");

        const chargeId = await loopCtx.step<string>({
          name: "charge-card",
          timeout: 5_000,
          maxRetries: 5,
          retryBackoffBase: 200,
          retryBackoffMax: 2_000,
          run: async () => await chargeCard(message.body.orderId),
        });

        await loopCtx.step("save-charge", async () => {
          loopCtx.state.lastChargeId = chargeId;
        });

      });
  }),
});

export const registry = setup({ use: { timeoutActor } });

Handling terminal failures as data

Use tryStep when a step failure should produce data instead of failing the whole workflow.

ts
import { actor, setup } from "rivetkit";
import { workflow } from "rivetkit/workflow";

export const paymentActor = actor({
  state: {
    status: "pending" as "pending" | "manual-review" | "paid",
    reason: null as string | null,
  },
  run: workflow(async (ctx) => {
    const charge = await ctx.tryStep({
      name: "charge-card",
      maxRetries: 3,
      run: async () => await chargeCard("order-123"),
    });

    await ctx.step("store-charge-result", async () => {
      if (!charge.ok) {
        ctx.state.status = "manual-review";
        ctx.state.reason = charge.failure.error.message;
        return;
      }

      ctx.state.status = "paid";
      ctx.state.reason = null;
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function chargeCard(orderId: string): Promise<string> {
  return `charge-${orderId}`;
}

export const registry = setup({ use: { paymentActor } });

Use try when you want to recover from terminal step, join, or race failures inside a named block.

ts
async function runPaymentFlow(ctx: any) {
  return await ctx.try("payment-flow", async (blockCtx: any) => {
    const auth = await blockCtx.step("authorize", async () =>
      authorizeOrder("order-123"),
    );
    const capture = await blockCtx.step("capture", async () =>
      captureOrder("order-123"),
    );
    return { auth, capture };
  });
}

async function authorizeOrder(orderId: string): Promise<string> {
  return `auth-${orderId}`;
}

async function captureOrder(orderId: string): Promise<string> {
  return `capture-${orderId}`;
}
  • tryStep and try only catch terminal failures. Retry backoff, sleeps, queue waits, eviction, and history divergence still rethrow.
  • RollbackError is not caught by default. Pass catch: ["rollback"] when you want rollback failures returned as data.

Error hooks

Use onError when you want a best-effort notification for workflow failures.

  • Step failures include the attempt number, retry counts, whether the step will retry, and the next retry delay.
  • Workflow failures also include terminal errors outside steps, such as rollback failures or code/history mismatches.
  • The hook is observational. It is not part of workflow replay, so use it for logging, metrics, or updating non-critical actor state.
  • This is also a good place to forward workflow failures to Sentry or another error reporting pipeline.
ts
import { actor, event, setup } from "rivetkit";
import { workflow, type WorkflowErrorEvent } from "rivetkit/workflow";

export const errorHookActor = actor({
  state: {
    lastError: null as WorkflowErrorEvent | null,
  },
  events: {
    workflowError: event<[WorkflowErrorEvent]>(),
  },
  run: workflow(
    async (ctx) => {
      await ctx.step({
        name: "sync-ledger",
        maxRetries: 3,
        retryBackoffBase: 250,
        retryBackoffMax: 1_000,
        run: async () => {
          throw new Error("ledger unavailable");
        },
      });
    },
    {
      onError: (c, event) => {
        c.state.lastError = event;
        c.broadcast("workflowError", event);
      },
    },
  ),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { errorHookActor } });

Rollback

Use rollback checkpoints before steps that have compensating actions.

ts
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const checkoutActor = actor({
  state: { status: "pending" as string },
  queues: {
    orders: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop("checkout-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-order");

        await loopCtx.rollbackCheckpoint("checkout-checkpoint");

        await loopCtx.step<string>({
          name: "reserve-inventory",
          run: () => reserveInventory(message.body.orderId),
          rollback: async (_rollbackCtx, id) => {
            await releaseInventory(id as string);
          },
        });

        await loopCtx.step<string>({
          name: "charge-card",
          run: () => chargeCard(message.body.orderId),
          rollback: async (_rollbackCtx, chargeId) => {
            await refundCharge(chargeId as string);
          },
        });

        await loopCtx.step("confirm", async () => {
          loopCtx.state.status = "confirmed";
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function reserveInventory(orderId: string): Promise<string> {
  const res = await fetch("https://api.example.com/inventory/reserve", {
    method: "POST",
    body: JSON.stringify({ orderId }),
  });
  return ((await res.json()) as { reservationId: string }).reservationId;
}

async function releaseInventory(reservationId: string): Promise<void> {
  await fetch(`https://api.example.com/inventory/${reservationId}/release`, {
    method: "POST",
  });
}

async function chargeCard(orderId: string): Promise<string> {
  const res = await fetch("https://api.stripe.com/v1/charges", {
    method: "POST",
    headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
    body: JSON.stringify({ orderId }),
  });
  return ((await res.json()) as { id: string }).id;
}

async function refundCharge(chargeId: string): Promise<void> {
  await fetch("https://api.stripe.com/v1/refunds", {
    method: "POST",
    headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
    body: JSON.stringify({ charge: chargeId }),
  });
}

export const registry = setup({ use: { checkoutActor } });

Patterns

Store workflow progress in state + broadcast

Store progress in state so replay and recovery always restore it. Broadcast state changes so clients can render progress in realtime.

<CodeGroup workspace> ```ts index.ts import { actor, event, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type Progress = { stage: "idle" | "running" | "completed"; completed: number; total: number; };

export const progressActor = actor({ state: { progress: { stage: "idle", completed: 0, total: 0, } as Progress, sum: 0, }, events: { progressUpdated: event<Progress>(), }, queues: { jobs: queue<{ value: number }>(), }, run: workflow(async (ctx) => { await ctx.loop("progress-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-job");

    await loopCtx.step("mark-running", async () =>
      markProgressRunning(loopCtx),
    );

    await loopCtx.step("apply-job", async () =>
      applyProgressJob(loopCtx, message.body.value),
    );

  });

}), actions: { getState: (c) => c.state, }, });

function markProgressRunning(ctx: WorkflowLoopContextOf<typeof progressActor>): void { ctx.state.progress = { stage: "running", completed: ctx.state.progress.completed, total: ctx.state.progress.total + 1, }; ctx.broadcast("progressUpdated", ctx.state.progress); }

function applyProgressJob( ctx: WorkflowLoopContextOf<typeof progressActor>, value: number, ): void { ctx.state.sum += value; ctx.state.progress = { stage: "completed", completed: ctx.state.progress.completed + 1, total: ctx.state.progress.total, }; ctx.broadcast("progressUpdated", ctx.state.progress); }

export const registry = setup({ use: { progressActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.progressActor.getOrCreate(["main"]);
const conn = handle.connect();

conn.on("progressUpdated", (progress) => {
  console.log("progress", progress);
});

await handle.send("jobs", { value: 5 });
await handle.send("jobs", { value: 7 });

console.log(await handle.getState());
</CodeGroup>

Cron (queue-driven)

Rivet scheduling triggers actions. For cron-like workflows, use a small scheduled action as a bridge that enqueues work, then process that work in the workflow loop.

ts
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

function nextMinute(timestamp: number): number {
  const minuteMs = 60_000;
  return Math.floor(timestamp / minuteMs) * minuteMs + minuteMs;
}

export const cronActor = actor({
  state: {
    runs: 0,
    lastRunAt: null as number | null,
  },
  queues: {
    "cron-tick": queue<{ scheduledAt: number }>(),
  },
  onCreate: async (c) => {
    const firstTickAt = nextMinute(Date.now());
    await c.schedule.at(firstTickAt, "enqueueCronTick", firstTickAt);
  },
  actions: {
    enqueueCronTick: async (c, scheduledAt: number) => {
      await c.queue.send("cron-tick", { scheduledAt });

      const nextTickAt = nextMinute(scheduledAt + 1);
      await c.schedule.at(nextTickAt, "enqueueCronTick", nextTickAt);
    },
    getState: (c) => c.state,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("cron-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-cron-tick");

        await loopCtx.step("run-cron-job", async () => {
          loopCtx.state.runs += 1;
          loopCtx.state.lastRunAt = message.body.scheduledAt;
        });

      });
  }),
});

export const registry = setup({ use: { cronActor } });

These are common workflow shapes used in production systems.

Queue-driven worker

Use this when external systems enqueue work and the actor should process each item durably.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type Job = { id: string; amount: number };

export const queueWorkerActor = actor({
  state: {
    processed: 0,
    totalAmount: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("worker-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-job", {
          timeout: 30_000,
        });

        if (!message) return;
        const job = message.body as Job;

        await loopCtx.step("process-job", async () => {
          loopCtx.state.processed += 1;
          loopCtx.state.totalAmount += job.amount;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { queueWorkerActor } });

Setup & teardown

Use this when you need one-time initialization before a long-lived loop, plus cleanup when the actor stops sleeping or is destroyed.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

function openResource(): string {
  return "connected";
}

function closeResource(_resource: string): void {}

export const setupRunTeardownActor = actor({
  vars: {
    resource: null as string | null,
  },
  state: {
    initialized: false,
    ticks: 0,
  },
  onWake: (c) => {
    c.vars.resource = openResource();
  },
  onSleep: (c) => {
    if (!c.vars.resource) return;
    closeResource(c.vars.resource);
    c.vars.resource = null;
  },
  run: workflow(async (ctx) => {
    await ctx.step("setup", async () => {
      if (!ctx.vars.resource) ctx.vars.resource = openResource();
      ctx.state.initialized = true;
    });

    await ctx.loop("main-loop", async (loopCtx) => {
        await loopCtx.sleep("tick", 1_000);
        await loopCtx.step("tick-step", async () => {
          loopCtx.state.ticks += 1;
        });
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { setupRunTeardownActor } });

Human approval gate

Use this when an operation must pause for a user or system decision before continuing.

ts
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const approvalGateActor = actor({
  state: { status: "pending" as string },
  queues: {
    approval: queue<{ approved: boolean }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.step("validate-order", async () => {
      await validateOrder("order-123");
      ctx.state.status = "awaiting_approval";
    });

    const decision = await ctx.queue.next("wait-approval");

    if (decision.body.approved) {
      await ctx.step("fulfill-order", async () => {
        await fulfillOrder("order-123");
        ctx.state.status = "fulfilled";
      });
    } else {
      await ctx.step("cancel-order", async () => {
        await cancelOrder("order-123");
        ctx.state.status = "cancelled";
      });
    }
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function validateOrder(orderId: string): Promise<void> {
  const res = await fetch(
    `https://api.example.com/orders/${orderId}/validate`,
    { method: "POST" },
  );
  if (!res.ok) throw new Error("Order validation failed");
}

async function fulfillOrder(orderId: string): Promise<void> {
  await fetch(`https://api.example.com/orders/${orderId}/fulfill`, {
    method: "POST",
  });
}

async function cancelOrder(orderId: string): Promise<void> {
  await fetch(`https://api.example.com/orders/${orderId}/cancel`, {
    method: "POST",
  });
}

export const registry = setup({ use: { approvalGateActor } });

Fan-out / fan-in (join)

Use this when independent work items can run in parallel and you need a single merged result.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const fanInOutActor = actor({
  state: {
    total: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("join-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-refresh", {
          timeout: 30_000,
        });

        if (!message) return;

        const joined = await loopCtx.join("parallel-work", {
          users: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-users", () => fetchCount("/users")),
          },
          orders: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-orders", () => fetchCount("/orders")),
          },
          invoices: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-invoices", () => fetchCount("/invoices")),
          },
        });

        await loopCtx.step("merge-results", async () => {
          loopCtx.state.total =
            joined.users + joined.orders + joined.invoices;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function fetchCount(path: string): Promise<number> {
  const res = await fetch(`https://api.example.com${path}`);
  if (!res.ok) throw new Error(`fetch ${path} failed: ${res.status}`);
  return ((await res.json()) as { count: number }).count;
}

export const registry = setup({ use: { fanInOutActor } });

Batch drainer

Use this when throughput matters and handling one message at a time is too expensive.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type MetricMessage = { value: number };

export const batchDrainerActor = actor({
  state: {
    pending: [] as number[],
    flushedBatches: 0,
    lastBatchTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("drain-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-metric", {
          timeout: 5_000,
        });

        if (message) {
          const metric = message.body as MetricMessage;
          await loopCtx.step("buffer-message", async () => {
            loopCtx.state.pending.push(metric.value);
          });
        }

        if (loopCtx.state.pending.length < 5) return;

        await loopCtx.step("flush-batch", async () => flushBatch(loopCtx));

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function flushBatch(ctx: WorkflowLoopContextOf<typeof batchDrainerActor>): void {
  const total = ctx.state.pending.reduce(
    (sum: number, value: number) => sum + value,
    0,
  );
  ctx.state.lastBatchTotal = total;
  ctx.state.flushedBatches += 1;
  ctx.state.pending = [];
}

export const registry = setup({ use: { batchDrainerActor } });

Coordinator -> worker RPC

Use this when one actor orchestrates work by calling actions on other actors.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type TaskMessage = {
  taskId: string;
  workerId: string;
  value: number;
};

export const workerActor = actor({
  actions: {
    runTask: async (_c, value: number) => value * 2,
  },
});

export const coordinatorActor = actor({
  state: {
    lastTaskId: null as string | null,
    lastResult: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("orchestrator-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-task", {
          timeout: 30_000,
        });

        if (!message) return;
        const task = message.body as TaskMessage;

        const result = await loopCtx.step("dispatch-rpc", async () =>
          dispatchTask(loopCtx, task),
        );

        await loopCtx.step("record-result", async () => {
          loopCtx.state.lastTaskId = task.taskId;
          loopCtx.state.lastResult = result as number;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function dispatchTask(
  ctx: WorkflowLoopContextOf<typeof coordinatorActor>,
  task: TaskMessage,
): Promise<number> {
  const client = ctx.client();
  const worker = client.workerActor.getOrCreate([task.workerId]);
  return await worker.runTask(task.value);
}

export const registry = setup({ use: { coordinatorActor, workerActor } });

Request/response over queue (async RPC)

Use this when you want decoupled actor-to-actor communication with durable waits and explicit completion.

<CodeGroup workspace> ```ts index.ts import { actor, queue, setup } from "rivetkit"; import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type RequestMessage = { value: number };

export const requestResponseActor = actor({ state: { handled: 0, }, queues: { requests: queue<RequestMessage, { doubled: number }>(), }, run: workflow(async (ctx) => { await ctx.loop("request-response-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-request", { completable: true, });

    if (!message.complete) return;

    const doubled = await loopCtx.step("handle-request", async () => {
      loopCtx.state.handled += 1;
      return message.body.value * 2;
    });

    await message.complete({ doubled });
  });

}), });

export const registry = setup({ use: { requestResponseActor } });


```ts client.ts
import { createClient } from "rivetkit/client";
import type { registry } from "./index";

const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.requestResponseActor.getOrCreate(["main"]);

const result = await handle.send("requests", { value: 21 }, { wait: true });

if (result.status === "completed") {
  const response = result.response as { doubled: number };
  console.log(response.doubled);
}
</CodeGroup>

Scatter-gather across actors

Use this when multiple actors can process independent parts of a request in parallel, then return a merged response.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type ScatterMessage = { input: number };

export const shardActor = actor({
  actions: {
    compute: async (_c, input: number) => input * 10,
  },
});

export const scatterGatherActor = actor({
  state: {
    lastSum: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("scatter-gather-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-scatter", {
          timeout: 30_000,
        });

        if (!message) return;
        const scatter = message.body as ScatterMessage;

        const gathered = await loopCtx.join("gather", {
          shardA: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-a", async () =>
                callShard(joinCtx, "a", scatter.input),
              ),
          },
          shardB: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-b", async () =>
                callShard(joinCtx, "b", scatter.input),
              ),
          },
          shardC: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-c", async () =>
                callShard(joinCtx, "c", scatter.input),
              ),
          },
        });

        await loopCtx.step("aggregate", async () => {
          loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function callShard(
  ctx: WorkflowLoopContextOf<typeof scatterGatherActor>,
  shardId: "a" | "b" | "c",
  input: number,
): Promise<number> {
  const client = ctx.client();
  const handle = client.shardActor.getOrCreate([shardId]);
  return await handle.compute(input);
}

export const registry = setup({ use: { scatterGatherActor, shardActor } });

Timeout + fallback actor

Use this when a primary actor call might be slow or unavailable and you need a deterministic fallback path.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const primaryServiceActor = actor({
  actions: {
    fetchValue: async () => {
      await new Promise((resolve) => setTimeout(resolve, 500));
      return "primary";
    },
  },
});

export const fallbackServiceActor = actor({
  actions: {
    fetchValue: async () => "fallback",
  },
});

export const timeoutFallbackActor = actor({
  state: {
    lastSource: "none" as "none" | "primary" | "fallback",
    lastValue: "",
  },
  run: workflow(async (ctx) => {
    await ctx.loop("timeout-loop", async (loopCtx) => {
        await loopCtx.queue.nextBatch("wait-request", {
          timeout: 30_000,
        });

        const winner = await loopCtx.race("primary-vs-timeout", [
          {
            name: "primary",
            run: async (raceCtx) =>
              await raceCtx.step("call-primary", async () =>
                callPrimaryValue(raceCtx),
              ),
          },
          {
            name: "timeout",
            run: async (raceCtx) => {
              await raceCtx.sleep("primary-timeout", 200);
              return "timeout";
            },
          },
        ]);

        let value = winner.value as string;
        let source: "primary" | "fallback" = "primary";

        if (winner.winner === "timeout") {
          value = (await loopCtx.step("fallback-call", async () =>
            callFallbackValue(loopCtx),
          )) as string;
          source = "fallback";
        }

        await loopCtx.step("record-choice", async () => {
          loopCtx.state.lastSource = source;
          loopCtx.state.lastValue = value;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function callPrimaryValue(
  ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
  const client = ctx.client();
  const primary = client.primaryServiceActor.getOrCreate(["main"]);
  return await primary.fetchValue();
}

async function callFallbackValue(
  ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
  const client = ctx.client();
  const fallback = client.fallbackServiceActor.getOrCreate(["main"]);
  return await fallback.fetchValue();
}

export const registry = setup({
  use: { timeoutFallbackActor, primaryServiceActor, fallbackServiceActor },
});

Cross-actor saga (compensating actions)

Use this when a workflow spans multiple actors and each side effect may need compensation.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type CheckoutMessage = {
  orderId: string;
  amount: number;
};

export const inventoryActor = actor({
  actions: {
    reserve: async (_c, orderId: string) => `reserve-${orderId}`,
    release: async (_c, reservationId: string) => reservationId,
  },
});

export const billingActor = actor({
  actions: {
    charge: async (_c, amount: number) => `charge-${amount}`,
    refund: async (_c, chargeId: string) => chargeId,
  },
});

export const checkoutSagaActor = actor({
  state: {
    completedOrders: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("checkout-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-order", {
          timeout: 30_000,
        });

        if (!message) return;
        const checkout = message.body as CheckoutMessage;

        await loopCtx.rollbackCheckpoint("checkout-saga");

        await loopCtx.step({
          name: "reserve-inventory",
          run: async () => reserveInventoryForCheckout(loopCtx, checkout.orderId),
          rollback: async (_rollbackCtx, output) => {
            await releaseInventoryForCheckout(loopCtx, output as string);
          },
        });

        await loopCtx.step({
          name: "charge-card",
          run: async () => chargeCheckout(loopCtx, checkout.amount),
          rollback: async (_rollbackCtx, output) => {
            await refundCheckout(loopCtx, output as string);
          },
        });

        await loopCtx.step("mark-complete", async () => markOrderComplete(loopCtx));

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function reserveInventoryForCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  orderId: string,
): Promise<string> {
  const client = ctx.client();
  const inventory = client.inventoryActor.getOrCreate(["main"]);
  return await inventory.reserve(orderId);
}

async function releaseInventoryForCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  reservationId: string,
): Promise<void> {
  const client = ctx.client();
  const inventory = client.inventoryActor.getOrCreate(["main"]);
  await inventory.release(reservationId);
}

async function chargeCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  amount: number,
): Promise<string> {
  const client = ctx.client();
  const billing = client.billingActor.getOrCreate(["main"]);
  return await billing.charge(amount);
}

async function refundCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  chargeId: string,
): Promise<void> {
  const client = ctx.client();
  const billing = client.billingActor.getOrCreate(["main"]);
  await billing.refund(chargeId);
}

function markOrderComplete(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
): void {
  ctx.state.completedOrders += 1;
}

export const registry = setup({
  use: { checkoutSagaActor, inventoryActor, billingActor },
});

Signal-driven control loop

Use this when workflow progress should be triggered by commands/events instead of fixed polling intervals.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type ControlSignal = { kind: "pause" | "resume" | "stop" };

export const controlLoopActor = actor({
  state: {
    mode: "running" as "running" | "paused" | "stopped",
    handledSignals: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("control-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-signal", {
          timeout: 30_000,
        });

        if (!message) return;
        const signal = message.body as ControlSignal;

        await loopCtx.step("apply-signal", async () =>
          applyControlSignal(loopCtx, signal.kind),
        );

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function applyControlSignal(
  ctx: WorkflowLoopContextOf<typeof controlLoopActor>,
  kind: ControlSignal["kind"],
): void {
  ctx.state.handledSignals += 1;
  if (kind === "pause") ctx.state.mode = "paused";
  if (kind === "resume") ctx.state.mode = "running";
  if (kind === "stop") ctx.state.mode = "stopped";
}

export const registry = setup({ use: { controlLoopActor } });

Poll + backoff loop

Use this when an external dependency has variable availability and retries should slow down after failures.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

async function pollExternal(attempt: number): Promise<boolean> {
  return attempt % 3 === 0;
}

export const pollBackoffActor = actor({
  state: {
    attempts: 0,
    backoffMs: 100,
    status: "unknown" as "unknown" | "healthy" | "retrying",
  },
  run: workflow(async (ctx) => {
    await ctx.loop("poll-loop", async (loopCtx) => {
        const success = await loopCtx.step("poll-target", async () => {
          loopCtx.state.attempts += 1;
          return pollExternal(loopCtx.state.attempts);
        });

        if (success) {
          await loopCtx.step("reset-backoff", async () => {
            loopCtx.state.status = "healthy";
            loopCtx.state.backoffMs = 100;
          });
          await loopCtx.sleep("healthy-interval", 1_000);
          return;
        }

        await loopCtx.step("grow-backoff", async () => {
          loopCtx.state.status = "retrying";
          loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000);
        });

        await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs);
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { pollBackoffActor } });

Child worker orchestration

Use this when one workflow coordinates many child workers (actors or worker workflows) and manages their lifecycle.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type BatchMessage = { payload: number };

export const childWorkerActor = actor({
  actions: {
    process: async (_c, payload: number) => payload * 3,
  },
});

export const orchestratorActor = actor({
  state: {
    lastTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("start-children", async () => startChildren(ctx));

    await ctx.loop("orchestrate-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-batch", {
          timeout: 30_000,
        });

        if (!message) return;
        const batch = message.body as BatchMessage;

        const results = await loopCtx.join("collect-updates", {
          a: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-a", async () =>
                runChildWorker(joinCtx, "child-a", batch.payload),
              ),
          },
          b: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-b", async () =>
                runChildWorker(joinCtx, "child-b", batch.payload),
              ),
          },
          c: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-c", async () =>
                runChildWorker(joinCtx, "child-c", batch.payload),
              ),
          },
        });

        await loopCtx.step("reconcile", async () => {
          loopCtx.state.lastTotal = results.a + results.b + results.c;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function startChildren(
  ctx: WorkflowContextOf<typeof orchestratorActor>,
): Promise<void> {
  const client = ctx.client();
  await client.childWorkerActor.getOrCreate(["child-a"]).process(0);
  await client.childWorkerActor.getOrCreate(["child-b"]).process(0);
  await client.childWorkerActor.getOrCreate(["child-c"]).process(0);
}

async function runChildWorker(
  ctx: WorkflowBranchContextOf<typeof orchestratorActor>,
  workerId: "child-a" | "child-b" | "child-c",
  payload: number,
): Promise<number> {
  const client = ctx.client();
  return await client.childWorkerActor.getOrCreate([workerId]).process(payload);
}

export const registry = setup({ use: { orchestratorActor, childWorkerActor } });

Bounded drain + concurrency cap

Use this when inbound work can spike and you need predictable per-iteration limits.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type WorkMessage = { id: string; value: number };

const MAX_PER_ITERATION = 10;
const CONCURRENCY_LIMIT = 3;

async function processWork(value: number): Promise<number> {
  return value * 2;
}

async function runWithLimit<T>(
  limit: number,
  items: T[],
  fn: (item: T) => Promise<void>,
): Promise<void> {
  let nextIndex = 0;
  const workers = Array.from({ length: limit }, async () => {
    while (nextIndex < items.length) {
      const current = items[nextIndex];
      nextIndex += 1;
      await fn(current);
    }
  });
  await Promise.all(workers);
}

export const boundedDrainActor = actor({
  state: {
    processed: 0,
    lastWindowSize: 0,
    lastWindowTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("bounded-drain-loop", async (loopCtx) => {
        const window: WorkMessage[] = [];

        for (let i = 0; i < MAX_PER_ITERATION; i += 1) {
          const [message] = await loopCtx.queue.nextBatch("wait-work", {
            timeout: i === 0 ? 30_000 : 10,
          });
          if (!message) break;
          window.push(message.body as WorkMessage);
        }

        if (window.length === 0) return;

        await loopCtx.step("process-window", async () =>
          processWindow(loopCtx, window),
        );

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function processWindow(
  ctx: WorkflowLoopContextOf<typeof boundedDrainActor>,
  window: WorkMessage[],
): Promise<void> {
  let windowTotal = 0;
  await runWithLimit(CONCURRENCY_LIMIT, window, async (work) => {
    const result = await processWork(work.value);
    windowTotal += result;
  });

  ctx.state.processed += window.length;
  ctx.state.lastWindowSize = window.length;
  ctx.state.lastWindowTotal = windowTotal;
}

export const registry = setup({ use: { boundedDrainActor } });

Versioned workflow evolution

Use this when workflow structure changes across deployments and old histories must still replay.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const versionedWorkflowActor = actor({
  state: {
    runs: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("validate-v2", async () => {
      ctx.state.runs += 1;
    });

    await ctx.removed("validate-v1", "step");

    await ctx.loop("main-loop-v2", async (loopCtx) => {
        await loopCtx.sleep("idle", 500);
        await loopCtx.step("heartbeat-v2", async () => {
          loopCtx.state.runs += 1;
        });
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { versionedWorkflowActor } });

Checkpoint-friendly loop design

Use this when you need reliable replay and resume semantics across crashes and restarts.

ts
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type PaymentMessage = { id: string; amount: number };

export const checkpointFriendlyActor = actor({
  state: {
    appliedCount: 0,
    totalAmount: 0,
    lastPaymentId: null as string | null,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("payment-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-payment", {
          timeout: 30_000,
        });

        if (!message) return;
        const payment = message.body as PaymentMessage;

        await loopCtx.rollbackCheckpoint("apply-payment-checkpoint");

        const plan = (await loopCtx.step("build-plan", async () =>
          buildPaymentPlan(payment),
        )) as { paymentId: string; amount: number };

        await loopCtx.step("apply-side-effects", async () => {
          loopCtx.state.appliedCount += 1;
          loopCtx.state.totalAmount += plan.amount;
          loopCtx.state.lastPaymentId = plan.paymentId;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function buildPaymentPlan(payment: PaymentMessage): {
  paymentId: string;
  amount: number;
} {
  return {
    paymentId: payment.id,
    amount: payment.amount,
  };
}

export const registry = setup({ use: { checkpointFriendlyActor } });

Migrations

  • Keep workflow entry names stable once deployed.
  • If an old entry was removed or renamed, call ctx.removed(name, originalType).
  • This keeps replay compatible across deployments.

Step-only access to actor APIs

state, vars, db, client(), and connection/event APIs are only valid inside ctx.step(...) callbacks.

Use non-step workflow code for orchestration only: queue waits, sleeps, loops, joins, races, and rollback boundaries. Keep actor-local side effects in steps.

Debugging

  • GET /inspector/workflow-history returns workflow history status for an actor.
  • Response includes isWorkflowEnabled and history.
  • In non-dev mode, inspector endpoints require authorization.

Recommendations

  • Prefer queue-driven loops for long-lived workflows.
  • Structure long-lived workflows with setup and teardown around the main loop.
  • Keep actor state changes and side effects inside steps.
  • Store workflow progress in state and broadcast updates as progress changes.
  • Use timeouts and rollback for external side effects.