frameworks/motia/MIGRATION_GUIDE.md
This guide covers migrating from Motia v0.17.x to the Motia 1.0-RC framework. It is organized by area of concern so you can migrate incrementally.
Motia now requires the iii engine to run. The iii engine is the Rust-powered runtime that manages all modules (streams, state, API, queues, cron, observability) and orchestrates the SDK process. All adapter and infrastructure configuration is done through iii via a config.yaml file -- the SDK itself no longer handles any of this.
Install iii from https://iii.dev before proceeding with the migration.
The old motia.config.ts (using defineConfig) is replaced by two files managed by iii:
| Concern | Old | New |
|---|---|---|
| Project config & plugins | motia.config.ts (defineConfig({...})) | Removed (handled by iii engine via config.yaml) |
| Module/adapter config | N/A | config.yaml (iii engine config) |
| Auth & hooks | streamAuth in motia.config.ts | motia.config.ts (simplified, exports only auth hooks) |
| Build externals | .esbuildrc.json | Removed |
| Workbench UI layout | motia-workbench.json | Removed (see Workbench, Plugins, and Console) |
Old -- motia.config.ts:
import path from 'node:path'
import { defineConfig, type MotiaPlugin, type MotiaPluginContext, type StreamAuthRequest } from '@motiadev/core'
import bullmqPlugin from '@motiadev/plugin-bullmq/plugin'
import endpointPlugin from '@motiadev/plugin-endpoint/plugin'
import examplePlugin from '@motiadev/plugin-example/plugin'
import logsPlugin from '@motiadev/plugin-logs/plugin'
import observabilityPlugin from '@motiadev/plugin-observability/plugin'
import statesPlugin from '@motiadev/plugin-states/plugin'
import { z } from 'zod'
const streamAuthContextSchema = z.object({
userId: z.string(),
permissions: z.enum(['nodejs', 'python']).optional(),
})
const demoTokens: Record<string, z.infer<typeof streamAuthContextSchema>> = {
'token-nodejs': { userId: 'anderson', permissions: 'nodejs' },
'token-python': { userId: 'sergio', permissions: 'python' },
}
const extractAuthToken = (request: StreamAuthRequest): string | undefined => {
const protocol = request.headers['sec-websocket-protocol'] as string | undefined
if (protocol?.includes('Authorization')) {
const [, token] = protocol.split(',')
if (token) return token.trim()
}
try {
const url = new URL(request.url)
return url.searchParams.get('authToken') ?? undefined
} catch {
return undefined
}
}
export default defineConfig({
plugins: [
observabilityPlugin,
statesPlugin,
endpointPlugin,
logsPlugin,
examplePlugin,
bullmqPlugin,
],
streamAuth: {
contextSchema: z.toJSONSchema(streamAuthContextSchema),
authenticate: async (request: StreamAuthRequest) => {
const token = extractAuthToken(request)
if (!token) return null
const tokenData = demoTokens[token]
if (!tokenData) throw new Error(`Invalid token: ${token}`)
return tokenData
},
},
})
New -- config.yaml (development):
modules:
# ── Stream Module ──────────────────────────────────────────────────────
# Manages real-time data streams with WebSocket support.
# Adapters: KvStore (file_based | in_memory), RedisAdapter
- class: modules::stream::StreamModule
config:
port: ${STREAM_PORT:3112} # WebSocket server port (default: 3112)
host: 0.0.0.0 # Host address to bind (default: 0.0.0.0)
# auth_function: motia.stream.authenticate # Reference to auth fn in motia.config.ts
adapter:
class: modules::stream::adapters::KvStore
config:
store_method: file_based # "file_based" or "in_memory" (default: in_memory)
file_path: ./data/stream_store # Directory for file-based persistence
# save_interval_ms: 5000 # Disk flush interval in ms (default: 5000)
# ── State Module ───────────────────────────────────────────────────────
# Key-value state storage grouped by namespace.
# Adapters: KvStore (file_based | in_memory), RedisAdapter
- class: modules::state::StateModule
config:
adapter:
class: modules::state::adapters::KvStore
config:
store_method: file_based # "file_based" or "in_memory" (default: in_memory)
file_path: ./data/state_store.db # Directory for file-based persistence
# save_interval_ms: 5000 # Disk flush interval in ms (default: 5000)
# ── REST API Module ────────────────────────────────────────────────────
# Serves HTTP endpoints defined by step triggers.
- class: modules::api::RestApiModule
config:
port: 3111 # HTTP server port (default: 3111)
host: 0.0.0.0 # Host address to bind (default: 0.0.0.0)
default_timeout: 30000 # Request timeout in ms (default: 30000)
concurrency_request_limit: 1024 # Max concurrent requests (default: 1024)
cors:
allowed_origins: # Origins allowed to make cross-origin requests
- http://localhost:3000
- http://localhost:5173
allowed_methods: # HTTP methods allowed in CORS preflight
- GET
- POST
- PUT
- DELETE
- OPTIONS
# ── OpenTelemetry Module ───────────────────────────────────────────────
# Observability: distributed traces, metrics, and structured logs.
# Exporter types — traces: "otlp", "memory", "both"
# metrics: "memory", "otlp"
# logs: "memory", "otlp", "both"
- class: modules::observability::OtelModule
config:
enabled: true # Enable tracing (default: false)
service_name: my-service # Service name reported to OTEL collector
service_version: 0.1.0 # Service version (OTEL semantic convention)
# service_namespace: production # Service namespace (OTEL semantic convention)
exporter: memory # Trace exporter: "otlp", "memory", or "both" (default: otlp)
# endpoint: http://localhost:4317 # OTLP gRPC endpoint (for otlp/both exporters)
sampling_ratio: 1.0 # 0.0 to 1.0, fraction of traces to sample (1.0 = all)
memory_max_spans: 10000 # Max spans in memory (for memory/both exporters)
metrics_enabled: true # Enable metrics collection (default: false)
metrics_exporter: memory # Metrics exporter: "memory" or "otlp" (default: memory)
# metrics_retention_seconds: 3600 # Metrics retention in seconds (default: 3600)
# metrics_max_count: 10000 # Max metric data points in memory (default: 10000)
logs_enabled: true # Enable structured log storage (default: false)
logs_exporter: memory # Logs exporter: "memory", "otlp", or "both" (default: memory)
logs_max_count: 1000 # Max log entries in memory (default: 1000)
# logs_retention_seconds: 3600 # Logs retention in seconds (default: 3600)
# logs_sampling_ratio: 1.0 # Fraction of logs to keep, 0.0-1.0 (default: 1.0)
# logs_console_output: true # Also output OTEL logs to console (default: true)
# level: info # Engine log level: trace, debug, info, warn, error
# format: default # Log format: "default" (human-readable) or "json"
# ── Queue Module ───────────────────────────────────────────────────────
# Message queues for async step-to-step communication via enqueue().
# Adapters: BuiltinQueueAdapter, RedisAdapter, RabbitMQAdapter
- class: modules::queue::QueueModule
config:
adapter:
class: modules::queue::BuiltinQueueAdapter # In-process queue (no external deps)
# For Redis: class: modules::queue::RedisAdapter
# config: { redis_url: "redis://localhost:6379" }
# For RabbitMQ: class: modules::queue::RabbitMQAdapter
# config: { amqp_url: "amqp://localhost:5672" }
# ── PubSub Module ─────────────────────────────────────────────────────
# Internal publish/subscribe messaging between engine components.
# Adapters: LocalAdapter, RedisAdapter
- class: modules::pubsub::PubSubModule
config:
adapter:
class: modules::pubsub::LocalAdapter # In-process pubsub (no external deps)
# For Redis: class: modules::pubsub::RedisAdapter
# config: { redis_url: "redis://localhost:6379" }
# ── Cron Module ────────────────────────────────────────────────────────
# Schedules and executes cron-based triggers.
# Adapters: KvCronAdapter, RedisCronAdapter
- class: modules::cron::CronModule
config:
adapter:
class: modules::cron::KvCronAdapter # KV-based scheduler (no external deps)
# For Redis: class: modules::cron::RedisCronAdapter
# config: { redis_url: "redis://localhost:6379" }
# ── Exec Module ────────────────────────────────────────────────────────
# Manages the SDK process lifecycle. Watches files and restarts on change.
- class: modules::shell::ExecModule
config:
watch: # Glob patterns to watch for hot-reload
- steps/**/*.ts
- motia.config.ts
exec: # Commands to run as the SDK process (in order)
- npx motia dev
- bun run --enable-source-maps dist/index-dev.js
New -- motia.config.ts (auth/hooks):
import type { AuthenticateStream } from 'motia'
export const authenticateStream: AuthenticateStream = async (req, context) => {
context.logger.info('Authenticating stream', { req })
return { context: { userId: 'sergio' } }
}
| Old | New |
|---|---|
motia dev | iii |
motia build | motia build (unchanged) |
motia-workbench.json.motia/ directory (auto-generated state) — Warning: this will delete any local stream and state data persisted by the old engine; back up first if neededNote: motia.config.ts is not deleted -- it is simplified. Remove the defineConfig wrapper, all plugin imports, and the plugins array. Keep only the authentication hook exports (see the "New" example above).
The new Motia does not enforce a specific module system or runtime. You are free to use CommonJS, ESM, Node.js, Bun, or any compatible runtime. The framework adapts to your project's setup.
Motia now has first-class support for Bun in addition to Node.js. You can choose whichever runtime fits your project:
| Runtime | Dev Command Example | Production Example |
|---|---|---|
| Node.js | npx motia dev | node dist/index-production.js |
| Bun | bun run dist/index-dev.js | bun run --enable-source-maps dist/index-production.js |
You can use either CommonJS or ESM -- the choice is yours. If you want to adopt ESM (recommended for Bun compatibility and modern tooling), update your project:
package.json -- optionally add:
{
"type": "module"
}
tsconfig.json -- optionally change:
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "bundler",
"moduleDetection": "force"
}
}
If you prefer to stay on CommonJS, that works too. Motia does not force a migration.
This is the most important conceptual change in new Motia: there are no longer separate "step types". In the old version, you had API steps, Event steps, and Cron steps -- each with its own config type (ApiRouteConfig, EventConfig, CronConfig) and its own type field. In the new version, everything is just a Step. What used to determine the "type" of a step is now expressed through its triggers -- an array of trigger definitions that describe how and when the step is activated.
A single step can have multiple triggers of different kinds (HTTP, queue, cron, state, stream), making it far more flexible than the old one-type-per-step model.
| Old | New |
|---|---|
import { ... } from '@motiadev/core' | import { ... } from 'motia' |
ApiRouteConfig | StepConfig |
EventConfig | StepConfig |
CronConfig | StepConfig |
| `type: 'api' | 'event' |
emits: ['topic'] | enqueues: ['topic'] |
subscribes: ['topic'] | Moved into trigger: { type: 'queue', topic: '...' } |
virtualEmits | virtualEnqueues |
virtualSubscribes | virtualSubscribes (unchanged) |
| Old | New |
|---|---|
Handlers['StepName'] | Handlers<typeof config> |
StepHandler<typeof config> | Handlers<typeof config> |
ctx.emit({ topic, data }) | ctx.enqueue({ topic, data }) |
The new version uses as const satisfies StepConfig for full type inference:
// Old
export const config: ApiRouteConfig = {
type: 'api',
name: 'MyStep',
// ...
}
export const handler: Handlers['MyStep'] = async (req, ctx) => { ... }
// New
export const config = {
name: 'MyStep',
// ...
triggers: [{ type: 'http', method: 'GET', path: '/my-step' }],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (req, ctx) => { ... }
In the old version these were "API steps" -- a dedicated step type with type: 'api'. In the new version, HTTP is just a trigger type (type: 'http') on a regular step.
import { ApiRouteConfig, Handlers } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
name: z.string(),
email: z.string(),
})
export const config: ApiRouteConfig = {
type: 'api',
name: 'CreateUser',
description: 'Create a new user',
method: 'POST',
path: '/users',
bodySchema,
responseSchema: {
200: z.object({ id: z.string() }),
400: z.object({ error: z.string() }),
},
emits: ['user-created'],
flows: ['User Flow'],
middleware: [coreMiddleware, validateBearerToken],
}
export const handler: Handlers['CreateUser'] = async (req, { emit, logger }) => {
const { name, email } = req.body
logger.info('Creating user', { name, email })
await emit({
topic: 'user-created',
data: { name, email },
})
return { status: 200, body: { id: 'user-123' } }
}
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
name: z.string(),
email: z.string(),
})
export const config = {
name: 'CreateUser',
description: 'Create a new user',
flows: ['user-flow'],
triggers: [
{
type: 'http',
method: 'POST',
path: '/users',
bodySchema,
responseSchema: {
200: z.object({ id: z.string() }),
400: z.object({ error: z.string() }),
},
middleware: [validateBearerToken],
},
],
enqueues: ['user-created'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (req, { enqueue, logger }) => {
const { name, email } = req.body
logger.info('Creating user', { name, email })
await enqueue({
topic: 'user-created',
data: { name, email },
})
return { status: 200, body: { id: 'user-123' } }
}
type: 'api' is now type: 'http' inside a trigger object.method, path, bodySchema, responseSchema, middleware all move inside the trigger.emits becomes enqueues at the config level.emit() becomes enqueue() in the handler context.ApiRouteConfig to StepConfig with as const satisfies.emit() calls, some projects used type as the field name (e.g., emit({ type: 'topic-name', data })) while others used topic. The new enqueue() always uses topic: enqueue({ topic: 'topic-name', data }).The new version provides an http() helper for cleaner trigger definitions:
import { http } from 'motia'
export const config = {
name: 'CreateTodo',
flows: ['todo-app'],
triggers: [
http('POST', '/todo', {
bodySchema: z.object({ description: z.string() }),
responseSchema: {
200: todoSchema,
400: z.object({ error: z.string() }),
},
}),
],
enqueues: [],
} as const satisfies StepConfig
Note: Both the TypeScript and Python SDKs use
http()as the primary trigger helper. Both also exportapi()as a deprecated alias — it works identically but should be updated tohttp()for future compatibility.
The concept of "event steps" that subscribe to topics no longer exists as a step type. Instead, subscribing to a topic is now a queue trigger on a regular step.
import { EventConfig, Handlers } from '@motiadev/core'
import { z } from 'zod'
export const config: EventConfig = {
type: 'event',
name: 'DeployEnvironment',
description: 'Creates or updates an environment',
subscribes: ['deploy-environment-v2'],
emits: ['deploy-version-v2'],
input: z.object({
deploymentId: z.string(),
envVars: z.record(z.string()),
}),
flows: ['Deployment'],
}
export const handler: Handlers['DeployEnvironment'] = async (data, { logger, emit, streams }) => {
logger.info('Deploying environment', { deploymentId: data.deploymentId })
// ... business logic ...
await emit({
topic: 'deploy-version-v2',
data: { deploymentId: data.deploymentId },
})
}
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'DeployEnvironment',
description: 'Creates or updates an environment',
flows: ['deployment'],
triggers: [
{
type: 'queue',
topic: 'deploy-environment-v2',
input: z.object({
deploymentId: z.string(),
envVars: z.record(z.string()),
}),
},
],
enqueues: ['deploy-version-v2'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue, streams }) => {
logger.info('Deploying environment', { deploymentId: input.deploymentId })
// ... business logic ...
await enqueue({
topic: 'deploy-version-v2',
data: { deploymentId: input.deploymentId },
})
}
| Old | New |
|---|---|
type: 'event' | triggers: [{ type: 'queue', topic, input }] |
subscribes: ['topic'] | topic field inside trigger |
emits: ['topic'] | enqueues: ['topic'] |
input: schema | input: schema inside trigger (or wrap with jsonSchema()) |
infrastructure: {...} at config root | config: {...} inside the queue trigger |
emit({ topic, data }) | enqueue({ topic, data }) |
emit({ type: 'topic' }) (some old projects) | enqueue({ topic: 'topic' }) (field key standardized to topic) |
Handler receives data directly | Handler receives input directly |
jsonSchema() WrapperWhen the input schema needs JSON schema conversion for the engine, use the jsonSchema() wrapper:
import { jsonSchema } from 'motia'
triggers: [
{
type: 'queue',
topic: 'notification',
input: jsonSchema(
z.object({
email: z.string(),
templateId: z.string(),
})
),
},
]
import { CronConfig, Handlers } from '@motiadev/core'
export const config: CronConfig = {
type: 'cron',
name: 'DailyMetricsCollection',
description: 'Collects metrics daily at midnight',
cron: '0 5 * * *',
emits: ['collect-metrics'],
flows: ['Metrics Collection Flow'],
}
export const handler: Handlers['DailyMetricsCollection'] = async ({ logger, emit }) => {
logger.info('Collecting metrics')
await emit({
topic: 'collect-metrics',
data: { targetDate: new Date().toISOString() },
})
}
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'DailyMetricsCollection',
description: 'Collects metrics daily at midnight',
flows: ['metrics-collection-flow'],
triggers: [
{
type: 'cron',
expression: '0 0 5 * * *',
},
],
enqueues: ['collect-metrics'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
logger.info('Collecting metrics')
await enqueue({
topic: 'collect-metrics',
data: { targetDate: new Date().toISOString() },
})
}
| Old | New |
|---|---|
type: 'cron' at config root | triggers: [{ type: 'cron', expression }] |
cron: '0 5 * * *' (5-field) | expression: '0 0 5 * * *' (6-field: prepend seconds; 7th year field optional) |
Handler: async ({ logger, emit }) | Handler: async (input, { logger, enqueue }) |
emit() | enqueue() |
The new engine uses a 6-field cron expression (7th year field is optional):
┌──────────── second (0-59)
│ ┌────────── minute (0-59)
│ │ ┌──────── hour (0-23)
│ │ │ ┌────── day of month (1-31)
│ │ │ │ ┌──── month (1-12)
│ │ │ │ │ ┌── day of week (0-6, Sun=0)
│ │ │ │ │ │ ┌ year (optional)
│ │ │ │ │ │ │
* * * * * * *
Conversion examples:
| Old (5-field) | New (6-field) | Meaning |
|---|---|---|
0 5 * * * | 0 0 5 * * * | Daily at 5:00 AM |
0 2 * * * | 0 0 2 * * * | Daily at 2:00 AM |
*/5 * * * * | 0 */5 * * * * | Every 5 minutes |
0 0 * * 0 | 0 0 0 * * 0 | Weekly on Sunday at midnight |
Stream definitions remain similar but the access API has changed.
Old:
import { StreamConfig } from '@motiadev/core'
import { z } from 'zod'
export const config: StreamConfig = {
name: 'deployment',
baseConfig: { storageType: 'default' },
schema: z.object({
id: z.string(),
status: z.enum(['pending', 'progress', 'completed', 'failed']),
message: z.string().optional(),
}),
}
New:
import type { StreamConfig } from 'motia'
import { z } from 'zod'
export const config: StreamConfig = {
name: 'deployment',
baseConfig: { storageType: 'default' },
schema: z.object({
id: z.string(),
status: z.enum(['pending', 'progress', 'completed', 'failed']),
message: z.string().optional(),
}),
// New: lifecycle hooks (optional)
onJoin: async (subscription, context, authContext) => {
context.logger.info('Client joined stream', { subscription, authContext })
return { unauthorized: false }
},
onLeave: async (subscription, context, authContext) => {
context.logger.info('Client left stream', { subscription, authContext })
},
}
| Operation | Old | New |
|---|---|---|
| Get | streams.name.get(id, key) | streams.name.get(groupId, id) |
| Set | streams.name.set(id, key, value) | streams.name.set(groupId, id, value) |
| Update | N/A | streams.name.update(groupId, id, UpdateOp[]) |
| Delete | streams.name.delete(id, key) | streams.name.delete(groupId, id) |
The parameter naming changed from (id, key) to (groupId, id) to better reflect the data model: a stream is partitioned by groups, and within each group items are identified by id.
UpdateOpThe new version supports atomic update operations:
import type { UpdateOp } from 'motia'
await streams.deployment.update('merge-groups', traceId, [
{ type: 'increment', path: 'completedSteps', by: 1 },
{ type: 'set', path: 'status', value: 'progress' },
{ type: 'decrement', path: 'retries', by: 1 },
])
Available operations:
| Type | Fields | Description |
|---|---|---|
set | path, value | Set a field to a value (overwrite) |
merge | path (optional), value | Merge an object into the existing value (object-only) |
increment | path, by | Increment a numeric field |
decrement | path, by | Decrement a numeric field |
remove | path | Remove a field entirely |
Old:
const streamData = await streams.deployment.get(deploymentId, 'data')
streamData.status = 'completed'
streamData.message = 'Done'
await streams.deployment.set(deploymentId, 'data', streamData)
New:
await streams.deployment.update('data', deploymentId, [
{ type: 'set', path: 'status', value: 'completed' },
{ type: 'set', path: 'message', value: 'Done' },
])
Steps can now react to stream changes. The handler receives a StreamWrapperMessage with the following shape:
type StreamWrapperMessage<TStreamData> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id?: string
event: StreamCreate<TStreamData> | StreamUpdate<TStreamData> | StreamDelete<TStreamData> | StreamEvent
}
Where the event field contains one of:
{ type: 'create', data: TStreamData } -- a new item was created{ type: 'update', data: TStreamData } -- an existing item was updated{ type: 'delete', data: TStreamData } -- an item was deleted{ type: 'event', data: { type: string, data: TEventData } } -- a custom eventDefine a stream trigger with a condition to filter which stream messages activate the step:
triggers: [
{
type: 'stream',
streamName: 'deployment',
groupId: 'data',
condition: (input: StreamWrapperMessage) => input.event.type === 'update',
},
]
State provides key-value storage grouped by a namespace. The core get, set, and list operations remain the same as before. The new version introduces two important additions: atomic updates via the update method, and state triggers.
// Set a value
await ctx.state.set('orders', orderId, orderData)
// Get a value
const order = await ctx.state.get<Order>('orders', orderId)
// List all values in a group
const allOrders = await ctx.state.list<Order>('orders')
update()Instead of read-modify-write patterns, you can now perform atomic updates on state entries using UpdateOp[]:
await ctx.state.update<Order>('orders', orderId, [
{ type: 'increment', path: 'completedSteps', by: 1 },
{ type: 'set', path: 'status', value: 'shipped' },
{ type: 'decrement', path: 'retries', by: 1 },
])
This is the same UpdateOp interface used in streams (see Streams). It eliminates race conditions that can occur with manual get-then-set patterns.
Available operations:
| Type | Fields | Description |
|---|---|---|
set | path, value | Set a field to a value (overwrite) |
merge | path (optional), value | Merge an object into the existing value (object-only) |
increment | path, by | Increment a numeric field |
decrement | path, by | Decrement a numeric field |
remove | path | Remove a field entirely |
This is a brand new feature. Steps can now react to state changes by using a state trigger. The trigger includes a condition function that filters which state changes should activate the step:
import type { StateTriggerInput } from 'motia'
export const config = {
name: 'OnAllStepsComplete',
triggers: [
{
type: 'state',
condition: (input: StateTriggerInput<MyType>) => {
return (
input.group_id === 'tasks' &&
!!input.new_value &&
input.new_value.totalSteps === input.new_value.completedSteps
)
},
},
],
flows: ['my-flow'],
} as const satisfies StepConfig
The handler receives the state change event as its first argument, including new_value, old_value, item_id, and group_id. This enables powerful reactive patterns -- for example, triggering a step when a parallel merge completes, without polling or manual coordination.
In the old version, middleware was defined as ApiMiddleware functions and attached to step configs:
// src/middleware/bearerToken.middleware.ts
import { ApiMiddleware } from '@motiadev/core'
export const validateBearerToken: ApiMiddleware = async (req, ctx, next) => {
const authToken = req.headers['authorization'] as string
if (!authToken) {
return { status: 401, body: { error: 'Unauthorized' } }
}
// validate token...
req.tokenInfo = decoded
return next()
}
// In step config:
export const config: ApiRouteConfig = {
type: 'api',
name: 'GetUser',
middleware: [coreMiddleware, validateBearerToken],
// ...
}
The middleware field has moved from the config root into the HTTP trigger object:
export const config = {
name: 'GetUser',
flows: ['users'],
triggers: [
{
type: 'http',
method: 'GET',
path: '/users',
middleware: [validateBearerToken],
},
],
enqueues: [],
} as const satisfies StepConfig
Stream authentication is configured separately in motia.config.ts via authenticateStream.
You can also use shared utility functions called directly within handlers as an alternative:
// Alternative: handler-level auth
export async function requireAuth(request: ApiRequest<any>): Promise<TokenData> {
const authToken = request.headers['authorization'] as string
if (!authToken) {
throw new HttpError(401, 'Unauthorized')
}
const [, token] = authToken.split(' ')
return jwt.verify(token, env.JWT_SECRET) as TokenData
}
export const handler: Handlers<typeof config> = async (request, { logger }) => {
const tokenData = await requireAuth(request)
// ... rest of handler
}
A single step can now respond to multiple trigger types:
export const config = {
name: 'ProcessOrder',
flows: ['orders'],
triggers: [
{ type: 'queue', topic: 'order.created', input: orderSchema },
{ type: 'http', method: 'POST', path: '/orders/manual', bodySchema: orderSchema },
{ type: 'cron', expression: '* * * * *' },
],
enqueues: ['order.processed'],
} as const satisfies StepConfig
step() HelperFor multi-trigger steps, the step() helper provides ctx.getData() and ctx.match():
import { http, queue, step } from 'motia'
export const stepConfig = {
name: 'ProcessOrder',
flows: ['orders'],
triggers: [
queue('order.created', { input: orderSchema }),
http('POST', '/orders', { bodySchema: orderSchema }),
],
enqueues: ['notification'],
}
export const { config, handler } = step(stepConfig, async (input, ctx) => {
// ctx.getData() returns the data regardless of trigger type
const data = ctx.getData()
// ctx.match() for trigger-specific handling
return ctx.match({
http: async (request) => {
return { status: 200, body: { success: true } }
},
queue: async (queueInput) => {
ctx.logger.info('Processing from queue', { queueInput })
},
})
})
Triggers can include a condition function that determines whether the step should execute:
triggers: [
{
type: 'queue',
topic: 'order.created',
input: orderSchema,
condition: (input, ctx) => {
return input.amount > 1000 // Only process high-value orders
},
},
{
type: 'http',
method: 'POST',
path: '/orders/manual',
bodySchema: orderSchema,
condition: (input, ctx) => {
if (ctx.trigger.type !== 'http') return false
return input.body.user.verified === true
},
},
]
Shorthand helpers for creating triggers:
import { http, queue } from 'motia'
triggers: [
http('POST', '/todo', { bodySchema: schema, responseSchema: { 200: schema } }),
queue('process-todo', { input: schema }),
]
config.yaml with module definitions (stream, state, api, queue, cron, exec)motia.config.ts for authentication hooks (if needed)motia.config.ts: remove defineConfig, all plugin imports, and the plugins array; keep only auth hook exportsmotia-workbench.json.motia/ directory — Warning: this will delete any local stream and state data persisted by the old engine; back up first if neededmotia dev to iii@motiadev/core imports with motiaApiRouteConfig / EventConfig / CronConfig imports with StepConfigtriggers[] and enqueues[]as const satisfies StepConfig to all configsHandlers['StepName'] with Handlers<typeof config>emit() calls to enqueue()emits config fields to enqueuessubscribes into queue triggersmethod, path, bodySchema, responseSchema, middleware into HTTP triggersinfrastructure from config root into queue triggers as configtype: 'api' to type: 'http' in all triggerscron into cron triggers as expression (prepend seconds; 7th year field is optional)type field from config rootmiddleware field from all step configsvirtualEmits with virtualEnqueues (format changes from [{ topic, label }] to ['topic'])virtualSubscribes as-is (no rename needed)get(id, key) to get(groupId, id)set(id, key, value) to set(groupId, id, value)update(groupId, id, UpdateOp[]) where possibleonJoin / onLeave hooks to stream configs if real-time subscription auth is neededstate.update() with UpdateOp[] to replace manual get-then-set patternsmiddleware arrays from config root into the corresponding HTTP trigger objectscron field to expression inside trigger objectsmotia as a standalone Python package (npm/Node.js no longer required!)config.yaml for the Python runtimemotia-workbench.json.ui.step.ts or noop step files used exclusively for workbench rendering@motiadev/workbench and @motiadev/core from package.json dependencies (replaced by motia)This is a major architectural change. In the old Motia, Python steps were managed by the same Node.js-based Motia runtime. Python files were executed as child processes spawned by the Node runtime, meaning Python developers previously needed Node.js and npm installed to use Motia at all.
In the new Motia, runtimes are fully independent. There is a dedicated Motia Python SDK (motia-py) that runs as its own standalone process, communicating directly with the iii engine. Python developers no longer need Node.js, npm, or any JavaScript tooling whatsoever.
| Aspect | Old | New |
|---|---|---|
| Python execution | Spawned as child process by Node runtime | Independent process managed by iii engine |
| Node.js required for Python? | Yes | No |
| SDK | Single motia npm package handled both | Separate motia-py (Python) and motia (Node) packages |
| Configuration | Shared with Node steps | Own config.yaml ExecModule entry pointing to the Python process |
| File naming | *_step.py | *_step.py (unchanged) |
| Package manager | pip / poetry | uv (recommended) |
Recommended migration order:
- Set up your Python project (
pyproject.tomlwithuv) — see Python Project Setup below- Add the Python ExecModule entry in
config.yaml— see Configuration and Module System for fullconfig.yamlstructure- Rename step files (
*_step.py→*_step.py)- Migrate step configs and handlers one at a time (use the subsections below as reference)
- Verify with the Python Migration Checklist at the end of this section
If your project has both Node and Python steps, you now configure separate ExecModule entries in config.yaml -- one for each runtime:
modules:
- class: modules::shell::ExecModule
config:
watch: # Glob patterns to watch for hot-reload
- steps/**/*.ts
- motia.config.ts
exec: # Commands to run as the SDK process (in order)
- npx motia dev
- bun run --enable-source-maps dist/index-dev.js
- class: modules::shell::ExecModule
config:
watch: # Glob patterns to watch for hot-reload
- steps/**/*.py
exec: # Commands to run as the SDK process (in order)
- uv run motia dev --dir steps
For the complete
config.yamlstructure including stream, state, API, queue, and cron adapter modules, see Section 1: Configuration and Section 2: Module System and Runtime.
Create a pyproject.toml in your project root:
[project]
name = "my-motia-project"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"motia[otel]==1.0.0rc17",
"iii-sdk==0.2.0",
"pydantic>=2.0",
]
[project.optional-dependencies]
dev = ["pytest>=8.0.0"]
[tool.uv]
package = false
Migrating from
requirements.txt: Move your existing dependencies fromrequirements.txtinto thedependencieslist inpyproject.toml. For example, if yourrequirements.txthasopenai>=1.40.0andhttpx>=0.27.0, add them alongside the Motia packages:tomldependencies = [ "motia[otel]==1.0.0rc17", "iii-sdk==0.2.0", "pydantic>=2.0", # Your existing dependencies: "openai>=1.40.0", "httpx>=0.27.0", ]Then delete
requirements.txt—uv syncwill install everything frompyproject.toml.
| Concern | Old | New |
|---|---|---|
| Config type field | "type": "api" / "event" / "cron" | Removed — use triggers list |
| API trigger | "method": "POST", "path": "/foo" at config root | http("POST", "/foo") in triggers |
| Event trigger | "subscribes": ["topic"] at config root | queue("topic", input=schema) in triggers |
| Cron trigger | "cron": "0 5 * * *" at config root | cron("0 0 5 * * *") in triggers (6-field: prepend seconds; 7th year field optional) |
| State trigger | N/A (new in v1.0) | state(condition=fn) in triggers |
| Stream trigger | N/A (new in v1.0) | stream("streamName") in triggers |
Enqueue config (was emits) | "emits": ["topic"] | "enqueues": ["topic"] |
Enqueue function (was emit) | context.emit({"topic": ..., "data": ...}) | ctx.enqueue({"topic": ..., "data": ...}) |
| Input schema location | "input": Schema.model_json_schema() at config root | queue("topic", input=Schema.model_json_schema()) inside trigger |
| Body schema location | "bodySchema": Schema.model_json_schema() at config root | http("POST", "/foo", body_schema=Schema.model_json_schema()) inside trigger |
| File naming | *_step.py | *_step.py (unchanged) |
| State list | context.state.get_group("group") | ctx.state.list("group") |
| Streams | ctx.streams.streamName.get(group_id, id) | Stream("name") module-level declaration |
| Logger | context.logger | ctx.logger |
| Trace ID | context.trace_id | ctx.trace_id |
| Path params | req.get("pathParams", {}).get("id") | request.path_params["id"] |
| Query params | req.get("queryParams", {}) | request.query_params |
| Headers | req.get("headers", {}) | request.headers |
| Labeled enqueues | "emits": [{"topic": "x", "label": "y", "conditional": True}] | "enqueues": [{"topic": "x", "label": "y", "conditional": True}] (same format, key renamed) |
<!-- -->Note: Some older projects used
"type"instead of"topic"as the key inemit()calls (e.g.,context.emit({"type": "topic-name", "data": {...}})). The newenqueue()always uses"topic".
Note on parameter names: The migration examples use
ctxandinput_dataas handler parameter names by convention, but any valid Python names work (e.g.,context,data). The framework identifies handlers by function name (handler) and argument count, not parameter names.
# steps/petstore/api_step.py
from pydantic import BaseModel
class RequestBody(BaseModel):
name: str
category: str
class Bill(BaseModel):
id: str
name: str
category: str
config = {
"type": "api",
"name": "Bill Classifier API Step",
"flows": ["classify-bill"],
"method": "POST",
"path": "/classify-bill",
"bodySchema": RequestBody.model_json_schema(),
"responseSchema": {200: Bill.model_json_schema()},
"emits": ["bill-created"],
}
async def handler(req, context):
body = req.get("body", {})
context.logger.info("Processing API Step", {"body": body})
new_bill = {"id": "bill-123", "name": body["name"], "category": body["category"]}
await context.emit({"topic": "bill-created", "data": new_bill})
return {"status": 200, "body": {**new_bill, "traceId": context.trace_id}}
# steps/petstore/classify_bill_api_step.py
from typing import Any
from motia import ApiRequest, ApiResponse, FlowContext, http
from pydantic import BaseModel
class RequestBody(BaseModel):
name: str
category: str
class Bill(BaseModel):
id: str
name: str
category: str
config = {
"name": "BillClassifierAPI",
"flows": ["classify-bill"],
"triggers": [http("POST", "/classify-bill")],
"enqueues": ["bill-created"],
}
async def handler(request: ApiRequest[Any], ctx: FlowContext[Any]) -> ApiResponse[dict]:
body = request.body
ctx.logger.info("Processing API Step", {"body": body})
new_bill = {"id": "bill-123", "name": body["name"], "category": body["category"]}
await ctx.enqueue({"topic": "bill-created", "data": new_bill})
return ApiResponse(status=200, body={**new_bill, "traceId": ctx.trace_id})
"type": "api" removed -- replaced by http() trigger in triggers list."method" and "path" move from config root into the http() call."emits" becomes "enqueues".context.emit() becomes ctx.enqueue().ApiRequest and returns ApiResponse instead of raw dicts.req.get("body", {}) becomes request.body.*_step.py to *_step.py (e.g., classify_bill_api_step.py).req.get("pathParams", {}).get("id") becomes request.path_params["id"].req.get("queryParams", {}) becomes request.query_params.The http() helper supports additional keyword arguments:
from motia import http, QueryParam
http("POST", "/orders",
body_schema=OrderInput.model_json_schema(),
response_schema={200: OrderResponse.model_json_schema()},
query_params=[QueryParam(name="filter", description="Filter criteria")],
middleware=[auth_middleware],
condition=is_authorized,
)
ApiRequest and ApiResponse FieldsThe ApiRequest object replaces the old raw req dict. Here are all available fields:
| Field | Type | Old equivalent | Description |
|---|---|---|---|
request.body | TBody | None | req.get("body", {}) | Parsed request body |
request.path_params | dict[str, str] | req.get("pathParams", {}) | URL path parameters (e.g., /users/:id → {"id": "123"}) |
request.query_params | dict[str, str | list[str]] | req.get("queryParams", {}) | URL query parameters |
request.headers | dict[str, str | list[str]] | req.get("headers", {}) | HTTP request headers |
The ApiResponse object replaces the old raw return dict:
| Field | Type | Old equivalent | Description |
|---|---|---|---|
status | int | "status" key | HTTP status code |
body | Any | "body" key | Response body |
headers | dict[str, str] | N/A (new) | Response headers (optional) |
Example — GET endpoint with path parameters:
# Old:
async def handler(req, context):
ingestion_id = req.get("pathParams", {}).get("ingestion_id")
return {"status": 200, "body": {"id": ingestion_id}}
# New:
async def handler(request: ApiRequest[Any], ctx: FlowContext[Any]) -> ApiResponse[dict]:
ingestion_id = request.path_params["ingestion_id"]
return ApiResponse(status=200, body={"id": ingestion_id})
Note: Path parameter syntax in routes is unchanged — use
:paramstyle (e.g.,http("GET", "/ingest/:ingestion_id")).
Middleware functions can be attached to API triggers via the middleware parameter:
from typing import Any, Awaitable, Callable
from motia import ApiRequest, ApiResponse, FlowContext
async def auth_middleware(
request: ApiRequest[Any],
ctx: FlowContext[Any],
next: Callable[[], Awaitable[ApiResponse[Any]]],
) -> ApiResponse[Any]:
token = request.headers.get("authorization", "")
if not token.startswith("Bearer "):
return ApiResponse(status=401, body={"error": "Unauthorized"})
return await next()
Steps can configure resource limits and queue behavior:
config = {
"name": "HeavyComputation",
"triggers": [
{
"type": "queue",
"topic": "heavy-job",
"config": {"type": "fifo", "maxRetries": 5},
}
],
}
# steps/petstore/process_food_order_step.py
from pydantic import BaseModel
class Bill(BaseModel):
id: str
name: str
category: str
config = {
"type": "event",
"name": "Classify Bill Step",
"flows": ["classify-bill"],
"subscribes": ["bill-created"],
"emits": ["notification"],
"input": Bill.model_json_schema(),
}
async def handler(input_data, context):
context.logger.info("Processing bill", {"input": input_data})
await context.emit({
"topic": "notification",
"data": {
"email": "[email protected]",
"templateId": "bill-classified",
"templateData": {"billId": input_data["id"]},
},
})
# steps/petstore/process_food_order_step.py
from typing import Any
from motia import FlowContext, queue
from pydantic import BaseModel
class Bill(BaseModel):
id: str
name: str
category: str
config = {
"name": "ClassifyBill",
"flows": ["classify-bill"],
"triggers": [queue("bill-created", input=Bill.model_json_schema())],
"enqueues": ["notification"],
}
async def handler(input_data: dict[str, Any], ctx: FlowContext[Any]) -> None:
ctx.logger.info("Processing bill", {"input": input_data})
await ctx.enqueue({
"topic": "notification",
"data": {
"email": "[email protected]",
"templateId": "bill-classified",
"templateData": {"billId": input_data["id"]},
},
})
"type": "event" removed -- replaced by queue() trigger."subscribes": ["bill-created"] moves into queue("bill-created", ...)."input": schema moves from config root into the queue() call as a keyword argument."emits" becomes "enqueues".context.emit() becomes ctx.enqueue().dict — use input_data["key"] or input_data.get("key") instead of attribute access (args.key). For type-safe access, validate with Pydantic: payload = MyModel.model_validate(input_data).Labeled enqueues — The "enqueues" field supports both simple strings and dicts with metadata, exactly as the old "emits" did:
# Simple format
"enqueues": ["notification", "audit-log"]
# Labeled format (same structure as old "emits")
"enqueues": [
{"topic": "notification", "label": "Send email notification"},
{"topic": "audit-log", "label": "Log to audit trail", "conditional": True},
]
Subscribing to multiple topics — a single step can listen to multiple queue topics by adding multiple triggers:
config = {
"name": "ChessGameMoved",
"flows": ["chess"],
"triggers": [
queue("chess-game-moved", input=InputSchema.model_json_schema()),
queue("chess-game-created", input=InputSchema.model_json_schema()),
],
"enqueues": ["ai-move"],
}
# steps/petstore/state_audit_cron_step.py
config = {
"type": "cron",
"cron": "0 0 * * 1",
"name": "StateAuditJob",
"emits": ["notification"],
"flows": ["classify-bill"],
}
async def handler(context):
state_value = await context.state.get_group("orders_python")
context.logger.info("Auditing state", {"count": len(state_value)})
await context.emit({
"topic": "notification",
"data": {"count": len(state_value)},
})
# steps/petstore/state_audit_cron_step.py
from typing import Any
from motia import FlowContext, cron
config = {
"name": "StateAuditJob",
"flows": ["classify-bill"],
"triggers": [cron("0 0 0 * * 1")],
"enqueues": ["notification"],
}
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
state_value = await ctx.state.list("orders_python")
ctx.logger.info("Auditing state", {"count": len(state_value)})
await ctx.enqueue({
"topic": "notification",
"data": {"count": len(state_value)},
})
"type": "cron" removed -- replaced by cron() trigger."cron": "0 0 * * 1" (5-field) becomes cron("0 0 0 * * 1") (6-field: prepend seconds). The 7th year field is optional — see Cron Expression Format in Section 6 for the full format diagram.async def handler(context) (single arg) to async def handler(input_data, ctx) (two args -- input_data is None for cron).context.state.get_group() becomes ctx.state.list().context.emit() becomes ctx.enqueue().State triggers fire when state data changes. This is a new trigger type — the old Motia did not have state-triggered steps.
# steps/users/on_user_change_step.py
from typing import Any
from motia import FlowContext, StateTriggerInput, state
config = {
"name": "OnUserStateChange",
"description": "React to user state changes",
"triggers": [
state(condition=lambda input, ctx: input.group_id == "users"),
],
"enqueues": ["user.status.changed"],
"flows": ["user-management"],
}
async def handler(input_data: StateTriggerInput, ctx: FlowContext[Any]) -> None:
ctx.logger.info("User state changed", {
"group_id": input_data.group_id,
"item_id": input_data.item_id,
})
old_status = input_data.old_value.get("status") if isinstance(input_data.old_value, dict) else None
new_status = input_data.new_value.get("status") if isinstance(input_data.new_value, dict) else None
await ctx.enqueue({
"topic": "user.status.changed",
"data": {
"user_id": input_data.item_id,
"old_status": old_status,
"new_status": new_status,
},
})
state() trigger accepts an optional condition function to filter which state changes to react to.StateTriggerInput with fields: group_id, item_id, old_value, new_value.state (trigger helper) and StateTriggerInput (input type) from motia.Stream triggers fire when stream data is created, updated, or deleted. This is a new trigger type — distinct from using Stream("name") for CRUD operations.
# steps/todos/on_todo_event_step.py
from typing import Any
from motia import FlowContext, StreamTriggerInput, stream
config = {
"name": "OnTodoStreamEvent",
"description": "React to todo stream events",
"triggers": [
stream("todo"),
],
"enqueues": ["todo.processed"],
"flows": ["todo-app"],
}
async def handler(input_data: StreamTriggerInput, ctx: FlowContext[Any]) -> None:
ctx.logger.info("Stream event", {
"stream_name": input_data.stream_name,
"group_id": input_data.group_id,
"item_id": input_data.id,
"event_type": input_data.event.type,
})
if input_data.event.type == "create":
ctx.logger.info(f"New todo created: {input_data.id}")
elif input_data.event.type == "update":
ctx.logger.info(f"Todo updated: {input_data.id}")
elif input_data.event.type == "delete":
ctx.logger.info(f"Todo deleted: {input_data.id}")
await ctx.enqueue({
"topic": "todo.processed",
"data": {"todo_id": input_data.id, "event_type": input_data.event.type},
})
stream() trigger accepts stream_name (required), plus optional group_id, item_id, and condition to filter events.StreamTriggerInput with fields: stream_name, group_id, id, event (which has type and data)."create", "update", or "delete".stream (trigger helper) and StreamTriggerInput (input type) from motia.A single step can have multiple triggers of different types. The ctx.match() method dispatches to the correct handler based on which trigger fired:
# steps/greetings/summary_step.py
from typing import Any
from motia import ApiRequest, ApiResponse, FlowContext, Stream, http, cron
greetings_stream: Stream[dict[str, Any]] = Stream("greetings")
config = {
"name": "GreetingsSummary",
"description": "Summarize greetings via API or every 5 seconds",
"triggers": [
http("GET", "/greetings/summary"),
cron("*/5 * * * * *"),
],
"enqueues": [],
}
async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
async def _api_handler(request: ApiRequest[Any]) -> ApiResponse[dict]:
greetings = await greetings_stream.get_group("default")
return ApiResponse(status=200, body={"count": len(greetings), "greetings": greetings})
async def _cron_handler() -> None:
greetings = await greetings_stream.get_group("default")
ctx.logger.info("Greetings summary (cron)", {"count": len(greetings)})
return await ctx.match({
"http": _api_handler,
"cron": _cron_handler,
})
ctx.match() Handler Signatures| Key | Handler signature | Receives |
|---|---|---|
"queue" | async (input) -> None | Queue data |
"http" (or "api") | async (request) -> ApiResponse | ApiRequest object |
"cron" | async () -> None | Nothing |
"state" | async (input) -> Any | StateTriggerInput |
"stream" | async (input) -> Any | StreamTriggerInput |
"default" | async (input) -> Any | Raw input (fallback) |
The ctx.trigger attribute (TriggerInfo) provides metadata about which trigger fired:
ctx.trigger.type # "http", "queue", "cron", "state", "stream"
ctx.trigger.topic # queue topic (queue triggers only)
ctx.trigger.path # request path (API triggers only)
ctx.trigger.method # HTTP method (API triggers only)
ctx.trigger.expression # cron expression (cron triggers only)
Type guard methods are also available:
if ctx.is_api():
# Handle API request
elif ctx.is_queue():
# Handle queue event
elif ctx.is_cron():
# Handle cron event
ctx.get_data() HelperThe ctx.get_data() method normalizes input extraction across trigger types:
request.bodyNoneFor steps with many triggers, the MultiTriggerStepBuilder provides a chainable API:
from motia import multi_trigger_step, http, queue, cron
my_step = (
multi_trigger_step({
"name": "MyStep",
"triggers": [queue("events"), http("POST", "/events"), cron("0 */5 * * * *")],
"enqueues": ["processed"],
})
.on_queue(queue_handler)
.on_http(http_handler)
.on_cron(cron_handler)
.handlers()
)
The StepBuilder is also available for single-trigger steps:
from motia import step, queue
my_step = step({
"name": "MyStep",
"triggers": [queue("events")],
}).handle(my_handler)
Every trigger helper (http(), queue(), cron(), state(), stream()) accepts an optional condition parameter — a function that determines whether the handler should run:
from typing import Any
from motia import FlowContext, queue
def is_high_value(input_data: Any, ctx: FlowContext[Any]) -> bool:
data = input_data or {}
return data.get("amount", 0) > 1000
config = {
"name": "HighValueOrders",
"triggers": [queue("order.created", condition=is_high_value)],
"enqueues": ["order.processed"],
}
Conditions can also be async and are supported on all trigger types:
http("POST", "/orders/premium", condition=api_premium_check)
cron("0 0 9 * * *", condition=is_business_hours)
state(condition=lambda input, ctx: input.group_id == "users")
stream("todo", condition=lambda input, ctx: input.event.type == "create")
Stream access changed from ctx.streams attribute access to module-level Stream("name") declarations.
# Streams accessed directly from the handler context
async def handler(input, ctx):
# Read from stream via ctx.streams
move_stream = await ctx.streams.chessGameMove.get(game_id, move_id)
# Modify and write back
move_stream["evaluation"] = evaluation
await ctx.streams.chessGameMove.set(game_id, move_id, move_stream)
from typing import Any
from motia import FlowContext, Stream, queue
from pydantic import BaseModel, Field
# Declare streams at module level (NOT inside the handler)
chess_game_move_stream: Stream[dict[str, Any]] = Stream("chessGameMove")
class EvaluateInput(BaseModel):
gameId: str = Field(description="The ID of the game")
moveId: str = Field(description="The ID of the move")
fenBefore: str = Field(description="FEN before the move")
fenAfter: str = Field(description="FEN after the move")
config = {
"name": "EvaluatePlayerMove",
"flows": ["chess"],
"triggers": [queue("evaluate-player-move", input=EvaluateInput.model_json_schema())],
"enqueues": [],
}
async def handler(input_data: dict[str, Any], ctx: FlowContext[Any]) -> None:
payload = EvaluateInput.model_validate(input_data)
# Read from stream using module-level Stream object
move = await chess_game_move_stream.get(payload.gameId, payload.moveId)
# Modify and write back
move["evaluation"] = {"score": 0.5}
await chess_game_move_stream.set(payload.gameId, payload.moveId, move)
ctx.streams.streamName -- instead, declare a Stream("name") at module level.ctx).Stream("chessGameMove") must match the name field in the corresponding .stream.ts config.Stream operations available in Python:
stream: Stream[dict] = Stream("myStream")
# Get a single item
item = await stream.get(group_id, item_id) # returns dict | None
# Set (create or update) an item
await stream.set(group_id, item_id, data)
# List all items in a group (aliases: list() and get_group())
items = await stream.list(group_id)
# Delete an item
await stream.delete(group_id, item_id)
# Atomic update with operations
await stream.update(group_id, item_id, [
{"op": "set", "path": "/status", "value": "completed"},
{"op": "increment", "path": "/count", "value": 1},
])
# List all group IDs in the stream
groups = await stream.list_groups()
The state API is accessed via ctx.state in handlers. The main change is get_group() → list():
# Read all items in a group
orders = await context.state.get_group("orders_python")
# Get a single item
order = await context.state.get("orders", order_id)
# Set an item
await context.state.set("orders", order_id, order_data)
# Read all items in a group
orders = await ctx.state.list("orders_python")
# Get a single item (unchanged)
order = await ctx.state.get("orders", order_id)
# Set an item (unchanged)
await ctx.state.set("orders", order_id, order_data)
# Delete an item (unchanged)
await ctx.state.delete("orders", order_id)
Additional state operations available in the new SDK:
# Atomic update with operations
await ctx.state.update("orders", order_id, [
{"op": "set", "path": "/status", "value": "shipped"},
])
# Clear all items in a scope
await ctx.state.clear("orders")
# List all scope IDs
scopes = await ctx.state.list_groups()
All Motia imports come from the motia package:
# Trigger helpers
from motia import http, queue, cron, state, stream # also: api (deprecated alias for http)
# Core types
from motia import ApiRequest, ApiResponse, FlowContext
from motia import Stream, StreamConfig
# Trigger input types (for state and stream trigger handlers)
from motia import StateTriggerInput, StreamTriggerInput, StreamEvent
# Config types
from motia import StepConfig, TriggerInfo, QueryParam
# Step builders (alternative to config dict + handler function)
from motia import step, StepBuilder
from motia import multi_trigger_step, MultiTriggerStepBuilder
# Middleware
from motia import ApiMiddleware
Here is the actual before/after of a real Python step migration (from the ChessArena project):
Before (evaluate_player_move_step.py):
import chess
import chess.engine
import os
from pydantic import BaseModel, Field
class EvaluatePlayerMoveInput(BaseModel):
fenBefore: str = Field(description="The FEN of the game before the move")
fenAfter: str = Field(description="The FEN of the game after the move")
gameId: str = Field(description="The ID of the game")
moveId: str = Field(description="The ID of the move")
player: str = Field(description="The player who made the move")
config = {
"type": "event",
"name": "EvaluatePlayerMove",
"description": "Evaluates the move picked by a player",
"subscribes": ["evaluate-player-move"],
"emits": [],
"flows": ["chess"],
"input": EvaluatePlayerMoveInput.model_json_schema(),
"includeFiles": ["../../lib/stockfish"]
}
async def handler(input: EvaluatePlayerMoveInput, ctx):
logger = ctx.logger
fen_before = input.get("fenBefore")
game_id = input.get("gameId")
move_id = input.get("moveId")
# ... (business logic omitted for brevity) ...
# Streams accessed via ctx.streams
move_stream = await ctx.streams.chessGameMove.get(game_id, move_id)
move_stream["evaluation"] = evaluation
await ctx.streams.chessGameMove.set(game_id, move_id, move_stream)
After (evaluate_player_move_step.py):
import os
from typing import Any, Literal
import chess
import chess.engine
from motia import FlowContext, Stream, queue
from pydantic import BaseModel, Field
# Stream declared at module level
chess_game_move_stream: Stream[dict[str, Any]] = Stream("chessGameMove")
class EvaluatePlayerMoveInput(BaseModel):
fenBefore: str = Field(description="The FEN of the game before the move")
fenAfter: str = Field(description="The FEN of the game after the move")
gameId: str = Field(description="The ID of the game")
moveId: str = Field(description="The ID of the move")
player: Literal["white", "black"] = Field(description="The player who made the move")
config = {
"name": "EvaluatePlayerMove",
"description": "Evaluates the move picked by a player",
"flows": ["chess"],
"triggers": [queue("evaluate-player-move", input=EvaluatePlayerMoveInput.model_json_schema())],
"enqueues": [],
"includeFiles": ["../../lib/stockfish"],
}
async def handler(input_data: dict[str, Any], ctx: FlowContext[Any]) -> None:
logger = ctx.logger
payload = EvaluatePlayerMoveInput.model_validate(input_data)
# ... (business logic omitted for brevity) ...
# Stream accessed via module-level Stream object
move_stream = await chess_game_move_stream.get(payload.gameId, payload.moveId)
move_stream["evaluation"] = evaluation
await chess_game_move_stream.set(payload.gameId, payload.moveId, move_stream)
*_step.py is unchanged.from motia import FlowContext, Stream, queue"type": "event", "subscribes", "emits" -- replaced with triggers: [queue(...)] and "enqueues"async def handler(input, ctx) → async def handler(input_data: dict[str, Any], ctx: FlowContext[Any]) -> Noneinput.get("fenBefore") → payload = EvaluatePlayerMoveInput.model_validate(input_data) then payload.fenBeforectx.streams.chessGameMove.get(...) → module-level Stream("chessGameMove") then chess_game_move_stream.get(...)requirements.txt deleted, replaced by pyproject.toml with motia[otel] and iii-sdkpyproject.toml with motia[otel], iii-sdk, and pydantic dependenciesuv sync to install dependencies — Node.js is no longer requiredconfig.yamlmotia.config.ts, package.json, and tsconfig.json if this is a Python-only project (replaced by config.yaml and pyproject.toml)motia-workbench.json (replaced by iii Console — see Section 13)*_step.py to *_step.pyrequirements.txt if present (replaced by pyproject.toml)"type" field from all step configs"subscribes": [...] with "triggers": [queue(...)]"method" / "path" at config root with "triggers": [http(...)]"cron": "..." at config root with "triggers": [cron("...")]"input": schema from config root into queue("topic", input=schema)"emits" to "enqueues" in all configs"bodySchema" from config root into http("POST", "/path", body_schema=...) trigger"responseSchema" from config root into http("POST", "/path", response_schema=...) trigger"queryParams" from config root into http("GET", "/path", query_params=...) triggercontext.emit() with ctx.enqueue() in all handlerscontext.state.get_group() with ctx.state.list()async def handler(context) to async def handler(input_data, ctx)ApiRequest / ApiResponse typesreq.get("pathParams", {}).get("x") with request.path_params["x"]req.get("queryParams", {}) with request.query_paramsreq.get("headers", {}) with request.headersreturn {"status": N, "body": {...}} with return ApiResponse(status=N, body={...})ctx.streams.streamName.get(...) with module-level Stream("name") declarationsargs.field) with dict access (input_data["field"] or input_data.get("field"))state() trigger and StateTriggerInputstream() trigger and StreamTriggerInputctx.match() for steps with mixed trigger typescondition functions to triggers where input filtering is neededstream.update() for atomic operations where appropriateThe Motia Workbench (the local visual flow editor, configured via motia-workbench.json) has been replaced by the iii Console. The console provides a richer experience for visualizing and managing your flows, traces, and infrastructure.
Refer to the iii quickstart documentation for iii Console installation instructions.
Workbench plugins (custom UI panels and extensions rendered inside the Workbench) have been sunset and are no longer supported. If your project relied on workbench plugins, you will need to find alternative approaches for any custom UI functionality they provided.
.ui.step.ts or noop step files that were used exclusively for workbench rendering.Motia's automatic OpenAPI/Swagger spec generation from HTTP step schemas is currently a work in progress. This feature is not yet available in the new version. If your project relied on generated OpenAPI specs, be aware that this capability will be restored in a future release.