website/src/content/posts/2026-06-16-introducing-the-effect-sdk/page.mdx
Today we're releasing the RivetKit Effect SDK (beta): a first-class Effect SDK for Rivet Actors.
A Rivet Actor is a lightweight, long-lived process with durability: it holds state in memory, persists it, hibernates when idle, and wakes intact.
Effect fits this model almost exactly. Its primitives are built to live and die with one process; on an actor, they outlive it.
If you know Effect, that's the whole idea in a sentence: an actor is a long-lived Scope holding a Ref that survives restarts. The SDK maps each primitive onto the actor without replacing it:
Scope → a lifetime that spans wake to sleep.Ref / SubscriptionRef → persisted, reactive state that survives restart.Layer / Context.Service → dependency injection, with a real engine in tests.Schema → payloads, successes, and failures validated across the wire.Rivet handles the infrastructure for you:
See the crash course for an introduction to Actors.
A common actor written in Effect has three files: the public contract, the server-only implementation, and the caller.
api.ts is the public contract. Actions are standalone values, each with an explicit Schema payload, success, and error type. It carries no implementation, so client code can import it for full type safety without pulling in anything server-side.
import { Action, Actor } from "@rivetkit/effect";
import { Schema } from "effect";
// A typed error in the action's error channel
export class NegativeAmountError extends Schema.TaggedErrorClass<NegativeAmountError>()(
"NegativeAmountError",
{ amount: Schema.Number, message: Schema.String },
) {}
export const Increment = Action.make("Increment", {
payload: { amount: Schema.Number },
success: Schema.Number,
error: NegativeAmountError,
});
export const GetCount = Action.make("GetCount", { success: Schema.Number });
// The definition ties the actions together under one actor name.
export const Counter = Actor.make("Counter", {
actions: [Increment, GetCount],
});
live.ts is the server-side implementation. Counter.toLayer supplies the action handlers through a wake function. It receives the actor's persisted state; state.schema and initialValue declare its shape and starting value. Note that failure is a returned value in the typed error channel, not a throw.
import { Actor, State } from "@rivetkit/effect";
import { Effect, Schema } from "effect";
import { Counter, NegativeAmountError } from "./api.ts";
export const CounterLive = Counter.toLayer(
Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
return Counter.of({
Increment: Effect.fnUntraced(function* ({ payload }) {
// Reject before mutating, so the error path leaves state untouched.
// The failure is a value in the typed error channel, not a throw.
if (payload.amount < 0) {
return yield* new NegativeAmountError({
amount: payload.amount,
message: `increment amount ${payload.amount} must not be negative`,
});
}
// Persisted state via a `SubscriptionRef`-like API
const next = yield* State.updateAndGet(state, (current) => ({
count: current.count + payload.amount,
})).pipe(Effect.orDie);
// Broadcast the new value to every connected client.
rawRivetkitContext.broadcast("newCount", next.count);
return next.count;
}),
GetCount: () => State.get(state).pipe(Effect.map((s) => s.count), Effect.orDie),
});
}),
{
state: {
schema: Schema.Struct({ count: Schema.Number }),
initialValue: () => ({ count: 0 }),
},
name: "Counter",
icon: "calculator",
},
);
client.ts is the caller. Counter.client is a typed accessor whose methods are the actions from api.ts. getOrCreate("hello-world") addresses a specific actor instance by key, creating it if it doesn't exist. The declared error arrives as a real tagged instance you can catchTag, not a string. Client.layer provides the connection to the engine.
import { Client } from "@rivetkit/effect";
import { Effect } from "effect";
import { Counter } from "./api.ts";
const program = Effect.gen(function* () {
// A typed accessor: methods are your actions, fully typed
const counter = (yield* Counter.client).getOrCreate("hello-world");
yield* Effect.log(`count is now ${yield* counter.Increment({ amount: 1 })}`);
yield* Effect.log(`count is now ${yield* counter.Increment({ amount: 5 })}`);
// The declared error arrives as a real tagged instance, caught by tag.
yield* counter.Increment({ amount: -1 }).pipe(
Effect.catchTag("NegativeAmountError", (err) =>
Effect.log(`rejected: ${err.message} (amount ${err.amount})`),
),
);
yield* Effect.log(`final count: ${yield* counter.GetCount()}`);
}).pipe(Effect.provide(Client.layer({ endpoint: "http://127.0.0.1:6420" })));
Registry.test wires the actor up for tests. Same Counter.client, no special-casing.
import { assert, layer } from "@effect/vitest";
import { Registry } from "@rivetkit/effect";
import { Effect, Layer } from "effect";
import { Counter } from "./api.ts";
import { CounterLive } from "./live.ts";
const TestLayer = Registry.test.pipe(
Layer.provideMerge(CounterLive),
Layer.provide(Registry.layer()),
);
layer(TestLayer)("counter", (it) => {
it.effect("increments and reads the count back", () =>
Effect.gen(function* () {
const counter = (yield* Counter.client).getOrCreate("t-increment");
assert.strictEqual(yield* counter.Increment({ amount: 1 }), 1);
assert.strictEqual(yield* counter.Increment({ amount: 5 }), 6);
assert.strictEqual(yield* counter.GetCount(), 6);
}),
);
});
Each section below is one Effect primitive and what an actor adds to it.
In a normal Effect program a Scope lives for one request. An actor wake is a Scope that spans the actor's entire awake lifetime:
Effect.addFinalizer: runs on sleep, so it's your hibernation hook.Effect.forkScoped: background fibers run until the actor sleeps, then cancel automatically.Counter.toLayer(
Effect.fnUntraced(function* ({ state }) {
// Runs on sleep, not on every call
yield* Effect.addFinalizer(() => Effect.log("actor sleeping"));
// Background fiber scoped to the wake; cancelled on sleep
yield* State.changes(state).pipe(
Stream.runForEach((s) =>
Effect.log("state changed", { count: s.count }),
),
Effect.forkScoped,
);
return Counter.of({
/* handlers */
});
}),
{ state },
);
See actor lifecycle for how wake and sleep are scheduled.
State keeps the Ref / SubscriptionRef shape but backs it with persistence:
get / set / update / updateAndGet / changes.State.changes: an Effect Stream of committed changes, a reactive view of durable state.Effect.race / Effect.timeout / interruption compose over a consistent store.// Atomic read-modify-write against persisted, single-writer state
const next = yield* State.updateAndGet(state, (s) => ({ count: s.count + 1 }));
// A live Stream of committed changes, backed by real persistence
yield* State.changes(state).pipe(Stream.runForEach(render), Effect.forkScoped);
This is in-memory state read straight from the actor's process; see actor state for how it persists and when it saves. For larger or queryable data, each actor also has an embedded SQLite database, covered below.
Effect tracks a program's dependencies in its type. You declare a need for a service with Context.Service, reach it with yield*, and satisfy it by providing a Layer; the compiler won't let the program run until every dependency is provided. That's Effect's dependency injection, and actors plug straight into it:
Layer and yield* it in the wake scope or any handler.Layer: wiring a registry is ordinary Layer composition.const GreeterLive = Layer.succeed(
Greeter,
Greeter.of({ greet: (n) => Hi, ${n}! }),
);
const CounterLive = Counter.toLayer( Effect.fnUntraced(function* ({ state }) { const greeter = yield* Greeter; // resolved per wake return Counter.of({ Greet: ({ payload }) => Effect.succeed(greeter.greet(payload.name)), }); }), { state }, );
// Production: a real engine. Tests: an in-process engine, same code. const Live = CounterLive.pipe( Layer.provide(GreeterLive), Layer.provide(Registry.layer({ endpoint, token, namespace })), );
```ts counter.test.ts
import { layer } from "@effect/vitest";
layer(TestLayer)("counter", (it) => {
it.effect("increments", () =>
Effect.gen(function* () {
const counter = (yield* Counter.client).getOrCreate("t");
assert.strictEqual(yield* counter.Increment({ amount: 5 }), 5);
}),
);
});
Every cross-boundary call is serialized, and the SDK places Schema at that boundary:
DateTimeUtc, URL, bigint, and Uint8Array round-trip safely.Schema.TaggedErrorClass (or a Schema.Union of them) arrives on the caller as an instance you can catchTag, not a string or unknown.The first tab declares an action whose failures are typed; the second catches one on the caller as a real instance.
<CodeGroup> ```ts api.ts export const SendMessage = Action.make("SendMessage", { payload: { sender: Schema.String, text: Schema.String }, // Either failure mode flows through the typed error channel error: Schema.Union([MemberNotInRoomError, BannedWordsError]), }); ```yield* room.SendMessage({ sender: "Mallory", text: "hello" }).pipe(
Effect.catchTag("MemberNotInRoomError", (err) =>
Effect.logWarning("message rejected", { name: err.name }),
),
);
Traces cross the actor boundary on their own. An Effect.withSpan in a handler nests under the SDK's server-side span, which nests under the client-side span, so a single call from client to actor and back is one connected trace with no context-propagation plumbing to wire up.
yield* room.SendMessage({ sender: "Admin", text: "Deploying now" }).pipe(
Effect.withSpan("broadcastAnnouncement", {
attributes: { room: room.key },
}),
);
The rest of the actor surface, reached from inside a handler.
Actions. The typed client works actor-to-actor, and a callee's typed error flows into the caller's declared error channel.
// Inside ChatRoom's wake scope
const moderatorClient = yield* Moderator.client;
SendMessage: Effect.fnUntraced(function* ({ payload }) {
const moderator = moderatorClient.getOrCreate([...address.key, "main"]);
// BannedWordsError from Review flows through SendMessage's error channel
yield* moderator.Review({ text: payload.text });
// ...persist and broadcast
}),
SQLite. An embedded, per-instance database, migrated before wake and co-located with the actor.
import { db } from "rivetkit/db";
export const ChatRoomLive = ChatRoom.toLayer(
Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
return ChatRoom.of({
GetHistory: () =>
Effect.tryPromise(() =>
rawRivetkitContext.db.execute(
"SELECT id, sender, text FROM messages ORDER BY id",
),
).pipe(Effect.orDie),
});
}),
{
state,
db: db({
onMigrate: async (client) => {
await client.execute(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
text TEXT NOT NULL
)
`);
},
}),
},
);
Events. Broadcast from a handler to every connected client, no pub/sub service or separate WebSocket server.
SendMessage: Effect.fnUntraced(function* ({ payload }) {
yield* persist(payload);
rawRivetkitContext.broadcast("newMessage", {
sender: payload.sender,
text: payload.text,
});
}),
Queues. Durable, ordered background work; a consumer maps to a forkScoped fiber that lives as long as the actor is awake. Available now through the scheduler while typed wrappers land.
// Dispatch an action to run later, by name, with the same payload a client sends
rawRivetkitContext.schedule.after(1_000, "SendMessage", {
sender: "Admin",
text: `Welcome to the room, ${name}!`,
});
(Uses of rawRivetkitContext are for parts of Rivet that don't have a native Effect API yet.)
One actor that uses all of the above: a custom service for domain rules, persisted state with a forked live change stream, a sleep finalizer, actor-to-actor RPC whose typed error flows through this actor's channel, and SQLite plus broadcast for durable history and realtime. The full runnable version is the chat-room-effect example.
import { Actor, State } from "@rivetkit/effect";
import { Context, DateTime, Effect, Layer, Schema, Stream } from "effect";
import { db } from "rivetkit/db";
import { Moderator } from "../moderator/api.ts";
import { ChatRoom, MemberNotInRoomError } from "./api.ts";
// A custom service used like any other Effect program
export class RoomPolicy extends Context.Service<
RoomPolicy,
{
readonly requireMember: (
members: ReadonlyArray<{ readonly name: string }>,
name: string,
) => Effect.Effect<void, MemberNotInRoomError>;
}
>()("RoomPolicy") {}
export const ChatRoomLive = ChatRoom.toLayer(
Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
const address = yield* Actor.CurrentAddress;
const roomPolicy = yield* RoomPolicy;
const moderatorClient = yield* Moderator.client;
// Finalizers run on sleep
yield* Effect.addFinalizer(() =>
Effect.log("room sleeping", { actorId: address.actorId }),
);
// A live, durable view of state changes, cancelled on sleep
yield* State.changes(state).pipe(
Stream.runForEach((s) =>
Effect.log("members", { count: s.members.length }),
),
Effect.forkScoped,
);
// Compose persisted state with a service-owned domain guard
const ensureMember = (name: string) =>
State.get(state).pipe(
Effect.orDie,
Effect.flatMap((s) =>
roomPolicy.requireMember(s.members, name),
),
);
return ChatRoom.of({
Join: Effect.fnUntraced(function* ({ payload }) {
const next = yield* State.updateAndGet(state, (s) => ({
...s,
members: [...s.members, { name: payload.name }],
})).pipe(Effect.orDie);
rawRivetkitContext.broadcast("memberJoined", {
name: payload.name,
});
return { memberCount: next.members.length };
}),
SendMessage: Effect.fnUntraced(function* ({ payload }) {
yield* ensureMember(payload.sender);
// Actor-to-actor RPC. A BannedWordsError here flows through
// SendMessage's declared error channel.
const moderator = moderatorClient.getOrCreate([
...address.key,
"main",
]);
yield* moderator.Review({ text: payload.text });
yield* Effect.tryPromise(() =>
rawRivetkitContext.db.execute(
"INSERT INTO messages (sender, text) VALUES (?, ?)",
payload.sender,
payload.text,
),
).pipe(Effect.orDie);
rawRivetkitContext.broadcast("newMessage", payload);
}),
Archive: () => Effect.sync(() => rawRivetkitContext.destroy()),
});
}),
{
state: {
schema: Schema.Struct({
members: Schema.Array(Schema.Struct({ name: Schema.String })),
}),
initialValue: () => ({ members: [] }),
},
db: db({
onMigrate: async (client) => {
await client.execute(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
text TEXT NOT NULL
)
`);
},
}),
name: "Chat Room",
icon: "comments",
},
);
Effect AI gives you a provider-agnostic LanguageModel service, and an actor is the perfect place to run it:
Layer: swap a real provider for a mock without touching actor code.The handler persists the user turn, calls the model with the running history, then persists the reply. The full example is ai-agent-effect.
import { Actor, State } from "@rivetkit/effect";
import { Effect, Schema } from "effect";
import { LanguageModel } from "effect/unstable/ai";
import { Agent, Message } from "./api.ts";
export const AgentLive = Agent.toLayer(
Effect.fnUntraced(function* ({ state }) {
return Agent.of({
SendMessage: Effect.fnUntraced(function* ({ payload }) {
// Persist the user turn before calling the model, so the message
// it replies to survives a restart mid-call.
const history = yield* State.updateAndGet(state, (turns) => [
...turns,
{ role: "user", content: payload.content } satisfies Message,
]).pipe(Effect.orDie);
// The handler requires the `LanguageModel` service but never builds
// it. Sending the whole history on every call is the agent's memory.
const response = yield* LanguageModel.generateText({
prompt: toPrompt(history),
}).pipe(Effect.orDie);
// Persist the assistant turn.
yield* State.update(state, (turns) => [
...turns,
{ role: "assistant", content: response.text } satisfies Message,
]).pipe(Effect.orDie);
return response.text;
}),
GetHistory: () => State.get(state).pipe(Effect.orDie),
});
}),
{
// `Message` is Schema.Struct({ role, content }); state is the running log.
state: { schema: Schema.Array(Message), initialValue: () => [] },
name: "Agent",
icon: "robot",
},
);
The Effect SDK is in beta, and the @rivetkit/effect API may change between releases.
The feature support matrix tracks the state of each feature.
Every wake function receives rawRivetkitContext, the underlying RivetKit actor context. It points at the same actor as the typed state argument, so you can mix both freely. Calls through it are not validated by Schema; payloads are typed as in the base RivetKit API.
Counter.toLayer(
Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
return Counter.of({
Increment: Effect.fnUntraced(function* ({ payload }) {
const next = yield* State.updateAndGet(state, (s) => ({
count: s.count + payload.amount,
}));
rawRivetkitContext.broadcast("newCount", next.count); // untyped, full access
return next.count;
}),
});
}),
{ state },
);
The SDK is shaped by what you build with it. Join our Discord to tell us what to wrap next and to contribute.
npm install rivetkit @rivetkit/effect effect @effect/platform-node
hello-world-effect, chat-room-effect, and ai-agent-effect