frameworks/motia/HANDLER_MIGRATION_GUIDE.md
This guide covers two major migration areas:
(req, ctx) to MotiaHttpArgs-based { request, response } destructuring, including SSE support.state, enqueue, logger, and streams have been removed from FlowContext and are now standalone imports.For a complete migration from Motia v0.17.x to 1.0-RC, see the full migration guide.
FlowContext (the second argument to handlers, commonly called ctx) no longer contains state, enqueue, logger, or streams. These are now standalone imports from 'motia' or from stream files.
| Aspect | Old | New |
|---|---|---|
| Logger | ctx.logger.info(...) | import { logger } from 'motia' then logger.info(...) |
| Enqueue | ctx.enqueue({ topic, data }) | import { enqueue } from 'motia' then enqueue({ topic, data }) |
| State | ctx.state.set(group, key, value) | import { stateManager } from 'motia' then stateManager.set(group, key, value) |
| Streams | ctx.streams.name.get(groupId, id) | import { myStream } from './my.stream' then myStream.get(groupId, id) |
After migration, FlowContext only contains:
interface FlowContext<TEnqueueData = never, TInput = unknown> {
traceId: string
trigger: TriggerInfo
is: {
queue: (input: TInput) => input is ExtractQueueInput<TInput>
http: (input: TInput) => input is ExtractApiInput<TInput>
cron: (input: TInput) => input is never
state: (input: TInput) => input is ExtractStateInput<TInput>
stream: (input: TInput) => input is ExtractStreamInput<TInput>
}
getData: () => ExtractDataPayload<TInput>
match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult | undefined>
}
Old:
import { type Handlers, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async (input, { logger }) => {
logger.info('Processing', { input })
}
New:
import { type Handlers, logger, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async (input) => {
logger.info('Processing', { input })
}
Old:
import { type Handlers, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async ({ request }, { enqueue }) => {
await enqueue({ topic: 'process-order', data: request.body })
return { status: 200, body: { ok: true } }
}
New:
import { enqueue, type Handlers, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async ({ request }) => {
await enqueue({ topic: 'process-order', data: request.body })
return { status: 200, body: { ok: true } }
}
Old:
import { type Handlers, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async (input, { state, logger }) => {
logger.info('Saving order')
await state.set('orders', input.orderId, input)
const orders = await state.list<Order>('orders')
}
New:
import { type Handlers, logger, type StepConfig, stateManager } from 'motia'
export const handler: Handlers<typeof config> = async (input) => {
logger.info('Saving order')
await stateManager.set('orders', input.orderId, input)
const orders = await stateManager.list<Order>('orders')
}
Streams are no longer accessed via ctx.streams. Instead, create a Stream instance in a .stream.ts file and import it into your steps.
Old:
import { type Handlers, type StepConfig } from 'motia'
export const handler: Handlers<typeof config> = async (input, { streams, logger }) => {
const todo = await streams.todo.get('inbox', todoId)
await streams.todo.set('inbox', todoId, newTodo)
await streams.todo.delete('inbox', todoId)
await streams.todo.update('inbox', todoId, [
{ type: 'set', path: 'status', value: 'done' },
])
}
New:
First, define your stream in a .stream.ts file:
// todo.stream.ts
import { Stream, type StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo',
schema: todoSchema,
}
export const todoStream = new Stream(config)
export type Todo = z.infer<typeof todoSchema>
Then import and use it in your step:
// create-todo.step.ts
import { type Handlers, logger, type StepConfig } from 'motia'
import { todoStream } from './todo.stream'
export const handler: Handlers<typeof config> = async ({ request }) => {
const todo = await todoStream.get('inbox', todoId)
await todoStream.set('inbox', todoId, newTodo)
await todoStream.delete('inbox', todoId)
await todoStream.update('inbox', todoId, [
{ type: 'set', path: 'status', value: 'done' },
])
}
When using ctx.match(), logger, enqueue, and stateManager are imports -- ctx is only used for match(), traceId, and trigger:
Old:
export const handler: Handlers<typeof config> = async (_, ctx) => {
return ctx.match({
http: async ({ request }) => {
ctx.logger.info('Processing via API')
await ctx.state.set('orders', orderId, request.body)
await ctx.enqueue({ topic: 'order.processed', data: request.body })
return { status: 200, body: { ok: true } }
},
queue: async (input) => {
ctx.logger.info('Processing from queue')
await ctx.state.set('orders', orderId, input)
},
cron: async () => {
const orders = await ctx.state.list('pending-orders')
ctx.logger.info('Batch processing', { count: orders.length })
},
})
}
New:
import { enqueue, type Handlers, logger, type StepConfig, stateManager } from 'motia'
export const handler: Handlers<typeof config> = async (_, ctx) => {
return ctx.match({
http: async ({ request }) => {
logger.info('Processing via API')
await stateManager.set('orders', orderId, request.body)
await enqueue({ topic: 'order.processed', data: request.body })
return { status: 200, body: { ok: true } }
},
queue: async (input) => {
logger.info('Processing from queue')
await stateManager.set('orders', orderId, input)
},
cron: async () => {
const orders = await stateManager.list('pending-orders')
logger.info('Batch processing', { count: orders.length })
},
})
}
HTTP step handlers now receive a MotiaHttpArgs object as their first argument instead of a bare request object. This object contains both request and response, enabling streaming patterns like SSE alongside standard request/response flows.
| Aspect | Old | New |
|---|---|---|
| First arg (TS/JS) | req (request object directly) | { request, response } (MotiaHttpArgs) |
| First arg (Python) | req (dict-like object) | request: ApiRequest or args: MotiaHttpArgs |
| Body access (TS/JS) | req.body | request.body |
| Path params (TS/JS) | req.pathParams | request.pathParams |
| Headers (TS/JS) | req.headers | request.headers |
| Body access (Python) | req.get("body", {}) | request.body |
| Path params (Python) | req.get("pathParams", {}).get("id") | request.path_params.get("id") |
| Return type (Python) | {"status": 200, "body": {...}} | ApiResponse(status=200, body={...}) |
| Middleware placement | Config root: middleware: [...] | Inside trigger: { type: 'http', ..., middleware: [...] } |
| Middleware first arg | req | { request, response } |
Old:
import { type Handlers, type StepConfig } from 'motia'
export const config = {
name: 'GetUser',
triggers: [
{ type: 'http', path: '/users/:id', method: 'GET' },
],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (req, { logger }) => {
const userId = req.pathParams.id
logger.info('Getting user', { userId })
return { status: 200, body: { id: userId } }
}
New:
import { type Handlers, logger, type StepConfig } from 'motia'
export const config = {
name: 'GetUser',
triggers: [
{ type: 'http', path: '/users/:id', method: 'GET' },
],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }) => {
const userId = request.pathParams.id
logger.info('Getting user', { userId })
return { status: 200, body: { id: userId } }
}
logger from 'motia' instead of destructuring from ctx{ request } (or { request, response } for SSE) from the first argumentrequest.body, request.pathParams, request.queryParams, request.headers{ status, body, headers? }interface MotiaHttpArgs<TBody = unknown> {
request: MotiaHttpRequest<TBody>
response: MotiaHttpResponse
}
interface MotiaHttpRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
method: string
requestBody: ChannelReader
}
type MotiaHttpResponse = {
status: (statusCode: number) => void
headers: (headers: Record<string, string>) => void
stream: NodeJS.WritableStream
close: () => void
}
When using ctx.match(), the HTTP branch handler also receives MotiaHttpArgs:
Old:
return ctx.match({
http: async (request) => {
const { userId } = request.body
return { status: 200, body: { ok: true } }
},
})
New:
return ctx.match({
http: async ({ request }) => {
const { userId } = request.body
return { status: 200, body: { ok: true } }
},
})
Old:
config = {
"name": "GetUser",
"triggers": [
{"type": "http", "path": "/users/:id", "method": "GET"}
],
"enqueues": [],
}
async def handler(req, ctx):
user_id = req.get("pathParams", {}).get("id")
ctx.logger.info("Getting user", {"userId": user_id})
return {"status": 200, "body": {"id": user_id}}
New:
from typing import Any
from motia import ApiRequest, ApiResponse, http, logger
config = {
"name": "GetUser",
"triggers": [
http("GET", "/users/:id"),
],
"enqueues": [],
}
async def handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
user_id = request.path_params.get("id")
logger.info("Getting user", {"userId": user_id})
return ApiResponse(status=200, body={"id": user_id})
ApiRequest, ApiResponse, logger from motiahttp() helper for trigger definitionslogger, enqueue, and stateManager are standalone imports -- not accessed via ctxrequest.body, request.path_params, request.query_params, request.headersApiResponse(status=..., body=...) instead of a plain dictclass ApiRequest(BaseModel, Generic[TBody]):
path_params: dict[str, str]
query_params: dict[str, str | list[str]]
body: TBody | None
headers: dict[str, str | list[str]]
class ApiResponse(BaseModel, Generic[TOutput]):
status: int
body: Any
headers: dict[str, str] = {}
Middleware has moved from the config root into the HTTP trigger object.
Old:
export const config = {
name: 'ProtectedEndpoint',
triggers: [
{ type: 'http', path: '/protected', method: 'GET' },
],
middleware: [authMiddleware],
enqueues: [],
} as const satisfies StepConfig
New:
export const config = {
name: 'ProtectedEndpoint',
triggers: [
{ type: 'http', path: '/protected', method: 'GET', middleware: [authMiddleware] },
],
enqueues: [],
} as const satisfies StepConfig
Old:
const authMiddleware: ApiMiddleware = async (req, ctx, next) => {
if (!req.headers.authorization) {
return { status: 401, body: { error: 'Unauthorized' } }
}
return next()
}
New:
const authMiddleware: ApiMiddleware = async ({ request }, ctx, next) => {
if (!request.headers.authorization) {
return { status: 401, body: { error: 'Unauthorized' } }
}
return next()
}
SSE is enabled by the response object in MotiaHttpArgs. Instead of returning a response, you write directly to the stream.
import { type Handlers, http, logger, type StepConfig } from 'motia'
export const config = {
name: 'SSE Example',
description: 'Streams data back to the client as SSE',
flows: ['sse-example'],
triggers: [http('POST', '/sse')],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request, response }) => {
logger.info('SSE request received')
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
})
const chunks: string[] = []
for await (const chunk of request.requestBody.stream) {
chunks.push(Buffer.from(chunk).toString('utf-8'))
}
const items = ['alpha', 'bravo', 'charlie']
for (const item of items) {
response.stream.write(`event: item\ndata: ${JSON.stringify({ item })}\n\n`)
await new Promise((resolve) => setTimeout(resolve, 500))
}
response.stream.write(`event: done\ndata: ${JSON.stringify({ total: items.length })}\n\n`)
response.close()
}
import asyncio
import json
from typing import Any
from motia import MotiaHttpArgs, http, logger
config = {
"name": "SSE Example",
"description": "Streams data back to the client as SSE",
"flows": ["sse-example"],
"triggers": [
http("POST", "/sse"),
],
"enqueues": [],
}
async def handler(args: MotiaHttpArgs[Any]) -> None:
request = args.request
response = args.response
logger.info("SSE request received")
await response.status(200)
await response.headers({
"content-type": "text/event-stream",
"cache-control": "no-cache",
"connection": "keep-alive",
})
raw_chunks: list[str] = []
async for chunk in request.request_body.stream:
if isinstance(chunk, bytes):
raw_chunks.append(chunk.decode("utf-8", errors="replace"))
else:
raw_chunks.append(str(chunk))
items = ["alpha", "bravo", "charlie"]
for item in items:
response.writer.stream.write(
f"event: item\ndata: {json.dumps({'item': item})}\n\n".encode("utf-8")
)
await asyncio.sleep(0.5)
response.writer.stream.write(
f"event: done\ndata: {json.dumps({'total': len(items)})}\n\n".encode("utf-8")
)
response.close()
request and response from the first argumentresponse.status() and response.headers() to configure the responseresponse.stream (TS/JS) or response.writer.stream (Python)response.close() when done streamingctx.logger / context.logger with import { logger } from 'motia'ctx.enqueue / context.enqueue with import { enqueue } from 'motia'ctx.state / context.state with import { stateManager } from 'motia'ctx.streams.name / context.streams.name with import { myStream } from './my.stream'.stream.ts files with new Stream(config) for each stream usedstate, enqueue, logger, streams from handler destructuring of ctxctx is only used for destructuring those removed properties, the second argument can be omitted entirely(req, ctx) to ({ request }, ctx) for all HTTP stepsreq.body with request.bodyreq.pathParams with request.pathParamsreq.queryParams with request.queryParamsreq.headers with request.headersmiddleware arrays from config root into HTTP trigger objects(req, ctx, next) to ({ request }, ctx, next)ctx.match() HTTP handlers: change (request) => to ({ request }) =>from motia import ApiRequest, ApiResponse, FlowContext, httphttp() helper in trigger definitionshandler(request: ApiRequest[Any], ctx: FlowContext[Any]) -> ApiResponse[Any]req.get("body", {}) with request.bodyreq.get("pathParams", {}).get("id") with request.path_params.get("id")req.get("queryParams", {}) with request.query_paramsreq.get("headers", {}) with request.headersApiResponse(status=..., body=...) instead of plain dictsMotiaHttpArgs instead of ApiRequest