.agents/skills/backend-dev-guidelines/references/middleware-guide.md
Complete guide to middleware patterns in Langfuse's Next.js + tRPC architecture.
File: web/src/server/api/trpc.ts
tRPC middleware in Langfuse is composable and type-safe. Each middleware enriches the context and provides guarantees to subsequent middleware.
1. Error Handling Middleware (withErrorHandling)
Intercepts all errors and transforms them into user-friendly tRPC errors:
const withErrorHandling = t.middleware(async ({ ctx, next }) => {
const res = await next({ ctx });
if (!res.ok) {
if (res.error.cause instanceof ClickHouseResourceError) {
// Surface ClickHouse resource errors with advice message
res.error = new TRPCError({
code: "SERVICE_UNAVAILABLE",
message: ClickHouseResourceError.ERROR_ADVICE_MESSAGE,
});
} else {
// Transform 5xx errors to not expose internals
const { code, httpStatus } = resolveError(res.error);
const isSafeToExpose = httpStatus >= 400 && httpStatus < 500;
res.error = new TRPCError({
code,
cause: null, // do not expose stack traces
message: isSafeToExpose
? res.error.message
: "Internal error. We have been notified and are working on it.",
});
}
}
return res;
});
2. OpenTelemetry Instrumentation (withOtelInstrumentation)
Propagates OpenTelemetry context with Langfuse-specific baggage:
const withOtelInstrumentation = t.middleware(async (opts) => {
const actualInput = await opts.getRawInput();
const baggageCtx = contextWithLangfuseProps({
headers: opts.ctx.headers,
userId: opts.ctx.session?.user?.id,
projectId: (actualInput as Record<string, string>)?.projectId,
});
return opentelemetry.context.with(baggageCtx, () => opts.next());
});
3. Authentication Middleware (enforceUserIsAuthed)
Ensures user is logged in via NextAuth session:
const enforceUserIsAuthed = t.middleware(({ ctx, next }) => {
if (!ctx.session || !ctx.session.user) {
throw new TRPCError({ code: "UNAUTHORIZED" });
}
return next({
ctx: {
// infers the `session` as non-nullable
session: { ...ctx.session, user: ctx.session.user },
},
});
});
4. Project Membership Middleware (enforceUserIsAuthedAndProjectMember)
Validates that the user is a member of the project specified in input:
const enforceUserIsAuthedAndProjectMember = t.middleware(async (opts) => {
const { ctx, next } = opts;
if (!ctx.session || !ctx.session.user) {
throw new TRPCError({ code: "UNAUTHORIZED" });
}
const actualInput = await opts.getRawInput();
const parsedInput = inputProjectSchema.safeParse(actualInput);
if (!parsedInput.success) {
throw new TRPCError({
code: "BAD_REQUEST",
message: "Invalid input, projectId is required",
});
}
const projectId = parsedInput.data.projectId;
const sessionProject = ctx.session.user.organizations
.flatMap((org) => org.projects.map((project) => ({ ...project, organization: org })))
.find((project) => project.id === projectId);
if (!sessionProject) {
throw new TRPCError({
code: "UNAUTHORIZED",
message: "User is not a member of this project",
});
}
return next({
ctx: {
session: {
...ctx.session,
user: ctx.session.user,
orgId: sessionProject.organization.id,
orgRole: sessionProject.organization.role,
projectId: projectId,
projectRole: sessionProject.role,
},
},
});
});
5. Organization Membership Middleware (enforceIsAuthedAndOrgMember)
Validates organization membership:
const enforceIsAuthedAndOrgMember = t.middleware(async (opts) => {
const { ctx, next } = opts;
if (!ctx.session || !ctx.session.user) {
throw new TRPCError({ code: "UNAUTHORIZED" });
}
const actualInput = await opts.getRawInput();
const result = inputOrganizationSchema.safeParse(actualInput);
if (!result.success) {
throw new TRPCError({
code: "BAD_REQUEST",
message: "Invalid input, orgId is required",
});
}
const orgId = result.data.orgId;
const sessionOrg = ctx.session.user.organizations.find((org) => org.id === orgId);
if (!sessionOrg && ctx.session.user.admin !== true) {
throw new TRPCError({
code: "UNAUTHORIZED",
message: "User is not a member of this organization",
});
}
return next({
ctx: {
session: {
...ctx.session,
user: ctx.session.user,
orgId: orgId,
orgRole: ctx.session.user.admin === true ? Role.OWNER : sessionOrg!.role,
},
},
});
});
6. Trace Access Middleware (enforceTraceAccess)
Special middleware for trace-level routes that supports public traces:
const enforceTraceAccess = t.middleware(async (opts) => {
const { ctx, next } = opts;
const actualInput = await opts.getRawInput();
const result = inputTraceSchema.safeParse(actualInput);
if (!result.success) {
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid input" });
}
const trace = await getTraceById({
traceId: result.data.traceId,
projectId: result.data.projectId,
timestamp: result.data.timestamp ?? undefined,
});
if (!trace) {
throw new TRPCError({ code: "NOT_FOUND", message: "Trace not found" });
}
const sessionProject = ctx.session?.user?.organizations
.flatMap((org) => org.projects)
.find(({ id }) => id === result.data.projectId);
// Allow access if:
// 1. User is a project member
// 2. Trace is public
// 3. User is admin
if (!trace.public && !sessionProject && ctx.session?.user?.admin !== true) {
throw new TRPCError({
code: "UNAUTHORIZED",
message: "User is not a member of this project and this trace is not public",
});
}
return next({
ctx: {
session: { ...ctx.session, projectRole: sessionProject?.role },
trace: trace, // pass the trace to avoid refetching
},
});
});
Langfuse exports composed procedures with middleware chains:
// 1. Public procedure (no auth required)
export const publicProcedure = withOtelTracingProcedure.use(withErrorHandling);
// 2. Authenticated procedure (NextAuth session required)
export const authenticatedProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceUserIsAuthed);
// 3. Project-scoped procedure (project membership required)
export const protectedProjectProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceUserIsAuthedAndProjectMember);
// 4. Organization-scoped procedure
export const protectedOrganizationProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceIsAuthedAndOrgMember);
// 5. Trace access procedure (public traces supported)
export const protectedGetTraceProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceTraceAccess);
// 6. Session access procedure
export const protectedGetSessionProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceSessionAccess);
// 7. Admin API key procedure (for admin operations)
export const adminProcedure = withOtelTracingProcedure
.use(withErrorHandling)
.use(enforceAdminAuth);
Files: web/src/features/public-api/server/
Wraps all public API routes with CORS, error handling, and OpenTelemetry:
export function withMiddlewares(handlers: Handlers) {
return async (req: NextApiRequest, res: NextApiResponse) => {
const ctx = contextWithLangfuseProps({ headers: req.headers });
return opentelemetry.context.with(ctx, async () => {
try {
// 1. CORS middleware
await runMiddleware(req, res, cors);
// 2. HTTP method routing
const method = req.method as HttpMethod;
if (!handlers[method]) throw new MethodNotAllowedError();
// 3. Execute handler
return await handlers[method](req, res);
} catch (error) {
// 4. Error handling
if (error instanceof BaseError) {
if (error.httpCode >= 500) traceException(error);
return res.status(error.httpCode).json({
message: error.message,
error: error.name,
});
}
if (error instanceof ClickHouseResourceError) {
return res.status(524).json({
message: ClickHouseResourceError.ERROR_ADVICE_MESSAGE,
error: "Request is taking too long to process.",
});
}
if (isZodError(error)) {
return res.status(400).json({
message: "Invalid request data",
error: error.issues,
});
}
traceException(error);
return res.status(500).json({
message: "Internal Server Error",
error: error instanceof Error ? error.message : "Unknown error",
});
}
});
};
}
Usage:
// web/src/pages/api/public/datasets/[datasetName]/items.ts
export default withMiddlewares({
GET: getDatasetItemsHandler,
POST: createDatasetItemHandler,
});
Factory function for authenticated public API routes with:
export const createAuthedProjectAPIRoute = <TQuery, TBody, TResponse>(
routeConfig: RouteConfig<TQuery, TBody, TResponse>
) => {
return async (req: NextApiRequest, res: NextApiResponse) => {
// 1. Authentication (verifyAuth)
const auth = await verifyAuth(req, routeConfig.isAdminApiKeyAuthAllowed || false);
// 2. Rate limiting
const rateLimitResponse = await RateLimitService.getInstance().rateLimitRequest(
auth.scope,
routeConfig.rateLimitResource || "public-api"
);
if (rateLimitResponse?.isRateLimited()) {
return rateLimitResponse.sendRestResponseIfLimited(res);
}
// 3. Input validation
const query = routeConfig.querySchema
? routeConfig.querySchema.parse(req.query)
: {};
const body = routeConfig.bodySchema
? routeConfig.bodySchema.parse(req.body)
: {};
// 4. Execute with OpenTelemetry context
const ctx = contextWithLangfuseProps({
headers: req.headers,
projectId: auth.scope.projectId,
});
return opentelemetry.context.with(ctx, async () => {
const response = await routeConfig.fn({ query, body, req, res, auth });
// 5. Response validation (dev only)
if (env.NODE_ENV === "development" && routeConfig.responseSchema) {
const parsingResult = routeConfig.responseSchema.safeParse(response);
if (!parsingResult.success) {
logger.error("Response validation failed:", parsingResult.error);
}
}
res.status(routeConfig.successStatusCode || 200).json(response);
});
};
};
Usage:
// web/src/pages/api/public/traces/[traceId].ts
export default createAuthedProjectAPIRoute({
name: "Get Trace",
querySchema: GetTraceV1Query,
responseSchema: GetTraceV1Response,
fn: async ({ query, auth }) => {
const trace = await getTraceById({
traceId: query.traceId,
projectId: auth.scope.projectId,
});
return transformTraceToApiResponse(trace);
},
});
tRPC uses NextAuth sessions stored in JWT cookies:
// Context creation with session
export const createTRPCContext = async (opts: CreateNextContextOptions) => {
const { req, res } = opts;
const session = await getServerAuthSession({ req, res });
addUserToSpan({
userId: session?.user?.id,
email: session?.user?.email ?? undefined,
});
return {
session,
headers: req.headers,
prisma,
DB,
};
};
Session types:
// Base authenticated context
export type AuthedContext = {
session: { user: NonNullable<Session["user"]> };
};
// Project-scoped context
export type ProjectAuthedContext = {
session: AuthedContext["session"] & {
orgId: string;
orgRole: Role;
projectId: string;
projectRole: Role;
};
};
Public APIs use Basic Auth with API keys:
async function verifyBasicAuth(authHeader: string | undefined) {
const regularAuth = await new ApiAuthService(prisma, redis)
.verifyAuthHeaderAndReturnScope(authHeader);
if (!regularAuth.validKey) {
throw { status: 401, message: regularAuth.error };
}
if (regularAuth.scope.accessLevel !== "project") {
throw { status: 401, message: "Access denied - need basic auth with secret key" };
}
return regularAuth;
}
Admin API Key Authentication (self-hosted only):
async function verifyAdminApiKeyAuth(req: NextApiRequest) {
// Requires:
// 1. Authorization: Bearer <ADMIN_API_KEY>
// 2. x-langfuse-admin-api-key: <ADMIN_API_KEY>
// 3. x-langfuse-project-id: <project-id>
if (env.NEXT_PUBLIC_LANGFUSE_CLOUD_REGION) {
throw { status: 403, message: "Admin API key auth not available on Langfuse Cloud" };
}
const adminApiKey = env.ADMIN_API_KEY;
const bearerToken = req.headers.authorization?.replace("Bearer ", "");
const adminApiKeyHeader = req.headers["x-langfuse-admin-api-key"];
// Timing-safe comparison
const isValid =
crypto.timingSafeEqual(Buffer.from(bearerToken), Buffer.from(adminApiKey)) &&
crypto.timingSafeEqual(Buffer.from(adminApiKeyHeader), Buffer.from(adminApiKey));
if (!isValid) throw { status: 401, message: "Invalid admin API key" };
const projectId = req.headers["x-langfuse-project-id"];
const project = await prisma.project.findUnique({ where: { id: projectId } });
if (!project) throw { status: 404, message: "Project not found" };
return { validKey: true, scope: { projectId, accessLevel: "project" } };
}
All tRPC errors go through withErrorHandling middleware:
Error types handled:
SERVICE_UNAVAILABLE (524)Example:
if (!res.ok) {
if (res.error.cause instanceof ClickHouseResourceError) {
res.error = new TRPCError({
code: "SERVICE_UNAVAILABLE",
message: ClickHouseResourceError.ERROR_ADVICE_MESSAGE,
});
} else {
const { code, httpStatus } = resolveError(res.error);
const isSafeToExpose = httpStatus >= 400 && httpStatus < 500;
res.error = new TRPCError({
code,
cause: null,
message: isSafeToExpose ? res.error.message : "Internal error.",
});
}
}
Public API uses withMiddlewares for error handling:
catch (error) {
// 1. BaseError (custom application errors)
if (error instanceof BaseError) {
if (error.httpCode >= 500) traceException(error);
return res.status(error.httpCode).json({
message: error.message,
error: error.name,
});
}
// 2. ClickHouseResourceError (query timeouts, memory limits)
if (error instanceof ClickHouseResourceError) {
return res.status(524).json({
message: ClickHouseResourceError.ERROR_ADVICE_MESSAGE,
error: "Request is taking too long to process.",
});
}
// 3. Zod validation errors
if (isZodError(error)) {
return res.status(400).json({
message: "Invalid request data",
error: error.issues,
});
}
// 4. Prisma errors
if (isPrismaException(error)) {
traceException(error);
return res.status(500).json({
message: "Internal Server Error",
error: "An unknown error occurred",
});
}
// 5. Unknown errors
traceException(error);
return res.status(500).json({
message: "Internal Server Error",
error: error instanceof Error ? error.message : "Unknown error",
});
}
All requests (tRPC and public API) propagate OpenTelemetry context with Langfuse-specific baggage.
import { contextWithLangfuseProps } from "@langfuse/shared/src/server";
import * as opentelemetry from "@opentelemetry/api";
// Create context with Langfuse baggage
const ctx = contextWithLangfuseProps({
headers: req.headers,
userId: session?.user?.id,
projectId: input?.projectId,
});
// Execute with context
return opentelemetry.context.with(ctx, async () => {
// All instrumented code inside here will have access to baggage
return await handler();
});
Baggage includes:
userId - User ID from sessionprojectId - Project ID from inputheaders - Request headers for trace propagationconst withOtelInstrumentation = t.middleware(async (opts) => {
const actualInput = await opts.getRawInput();
const baggageCtx = contextWithLangfuseProps({
headers: opts.ctx.headers,
userId: opts.ctx.session?.user?.id,
projectId: (actualInput as Record<string, string>)?.projectId,
});
return opentelemetry.context.with(baggageCtx, () => opts.next());
});
// Used with Baselime tracing
const withOtelTracingProcedure = t.procedure
.use(withOtelInstrumentation)
.use(tracing({ collectInput: true, collectResult: true }));
tRPC procedures are composed by chaining middleware:
// Base procedure with tracing + error handling
const baseProcedure = withOtelTracingProcedure.use(withErrorHandling);
// Add authentication
const authedProcedure = baseProcedure.use(enforceUserIsAuthed);
// Add project scoping
const projectProcedure = authedProcedure.use(enforceUserIsAuthedAndProjectMember);
import { protectedProjectProcedure } from "@/src/server/api/trpc";
export const tracesRouter = createTRPCRouter({
// Input automatically validated against Zod schema
all: protectedProjectProcedure
.input(
z.object({
projectId: z.string(),
page: z.number().optional(),
limit: z.number().optional(),
})
)
.query(async ({ input, ctx }) => {
// ctx.session.projectId is guaranteed to exist
// ctx.session.projectRole contains user's role
const traces = await getTraces({
projectId: input.projectId,
page: input.page ?? 0,
limit: input.limit ?? 50,
});
return traces;
}),
byId: protectedGetTraceProcedure
.input(
z.object({
traceId: z.string(),
projectId: z.string(),
timestamp: z.date().nullish(),
})
)
.query(async ({ input, ctx }) => {
// ctx.trace is guaranteed to exist (fetched by middleware)
// No need to refetch
return ctx.trace;
}),
});
Middleware executes in the order it's chained:
protectedProjectProcedure
.use(withErrorHandling) // 1. Wraps entire execution
.use(enforceUserIsAuthed) // 2. Validates session exists
.use(enforceUserIsAuthedAndProjectMember) // 3. Validates project membership
.input(schema) // 4. Validates input
.query(async ({ input, ctx }) => { // 5. Executes query
// ...
});
Context enrichment:
Each middleware can enrich the context:
// After enforceUserIsAuthed:
ctx.session.user // NonNullable<User>
// After enforceUserIsAuthedAndProjectMember:
ctx.session.projectId // string
ctx.session.projectRole // Role (OWNER | ADMIN | MEMBER | VIEWER)
ctx.session.orgId // string
ctx.session.orgRole // Role
// After enforceTraceAccess:
ctx.trace // TraceRecord (pre-fetched)
Related Files: