showcase/shell-docs/src/content/ag-ui/sdk/js/client/middleware.mdx
The middleware system in @ag-ui/client provides a powerful way to transform, filter, and augment event streams flowing through agents. Middleware can intercept and modify events, add logging, implement authentication, filter tool calls, and more.
AbstractAgent,
BaseEvent,
EventType,
FilterToolCallsMiddleware,
Message,
Middleware,
MiddlewareFunction,
RunAgentInput
} from "@ag-ui/client"
Examples below assume the relevant RxJS operators/utilities (map, tap, filter, finalize, catchError, switchMap, timer, of, etc.) are imported.
A function that transforms the event stream.
type MiddlewareFunction = (
input: RunAgentInput,
next: AbstractAgent,
) => Observable<BaseEvent>;
Function middleware receives events exactly as emitted by the next middleware or agent.
Abstract base class for creating middleware.
interface EventWithState {
event: BaseEvent;
messages: Message[];
state: unknown;
}
abstract class Middleware {
abstract run(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
protected runNext(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
protected runNextWithState(
input: RunAgentInput,
next: AbstractAgent,
): Observable<EventWithState>;
}
runNext() runs next.run(...) and normalizes chunk events into complete TEXT_MESSAGE_* / TOOL_CALL_* sequences.runNextWithState() does the same and also provides accumulated messages and state after each event is applied.The simplest way to create middleware is with a function. Function middleware is ideal for stateless transformations.
const loggingMiddleware: MiddlewareFunction = (input, next) => {
console.log(`[${new Date().toISOString()}] Starting run ${input.runId}`);
return next.run(input).pipe(
tap((event) => console.log(`Event: ${event.type}`)),
finalize(() => console.log(`Run ${input.runId} completed`)),
);
};
agent.use(loggingMiddleware);
const timestampMiddleware: MiddlewareFunction = (input, next) => {
return next.run(input).pipe(
map((event) => {
if (event.type === EventType.RUN_STARTED) {
return {
...event,
timestamp: Date.now(),
};
}
return event;
}),
);
};
const errorMiddleware: MiddlewareFunction = (input, next) => {
return next.run(input).pipe(
catchError((error) => {
console.error("Agent error:", error);
// Return error event
return of({
type: EventType.RUN_ERROR,
message: error.message,
} as BaseEvent);
}),
);
};
For stateful operations or complex logic, extend the Middleware class.
class CounterMiddleware extends Middleware {
private totalEvents = 0;
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
let runEvents = 0;
return this.runNext(input, next).pipe(
tap(() => {
runEvents++;
this.totalEvents++;
}),
finalize(() => {
console.log(`Run events: ${runEvents}, Total: ${this.totalEvents}`);
}),
);
}
}
agent.use(new CounterMiddleware());
class AuthMiddleware extends Middleware {
constructor(
private apiKey: string,
private headerName: string = "Authorization",
) {
super();
}
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
// Attach auth data in forwardedProps for downstream transport/agent logic
const authenticatedInput: RunAgentInput = {
...input,
forwardedProps: {
...input.forwardedProps,
auth: {
headerName: this.headerName,
value: `Bearer ${this.apiKey}`,
},
},
};
return this.runNext(authenticatedInput, next);
}
}
const apiKey = process.env.API_KEY ?? "";
agent.use(new AuthMiddleware(apiKey));
Class middleware can use helper methods from Middleware to work with normalized events and accumulated state.
runNext()runNext() forwards execution and normalizes chunk events into full TEXT_MESSAGE_* and TOOL_CALL_* events.
runNextWithState()runNextWithState() returns { event, messages, state }, where messages and state are the accumulated values after each event has been applied.
class MetricsWithStateMiddleware extends Middleware {
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
return this.runNextWithState(input, next).pipe(
tap(({ event, messages, state }) => {
if (event.type === EventType.RUN_FINISHED) {
const stateKeyCount =
state && typeof state === "object" ? Object.keys(state).length : 0;
console.log(
"Assistant messages:",
messages.filter((m) => m.role === "assistant").length,
);
console.log("Final state keys:", stateKeyCount);
}
}),
map(({ event }) => event),
);
}
}
Filters tool calls based on allowed or disallowed lists.
FilterToolCallsMiddleware filters emitted TOOL_CALL_* events (including args/results for blocked calls). It does not prevent tool execution in the upstream model/runtime.
type FilterToolCallsConfig =
| { allowedToolCalls: string[]; disallowedToolCalls?: never }
| { disallowedToolCalls: string[]; allowedToolCalls?: never };
const allowFilter = new FilterToolCallsMiddleware({
allowedToolCalls: ["search", "calculate", "summarize"],
});
agent.use(allowFilter);
You can also use disallowedToolCalls instead of allowedToolCalls.
const timingMiddleware: MiddlewareFunction = (input, next) => {
const startTime = performance.now();
return next.run(input).pipe(
finalize(() => {
const duration = performance.now() - startTime;
console.log(`Execution time: ${duration.toFixed(2)}ms`);
}),
);
};
class RateLimitMiddleware extends Middleware {
private lastCall = 0;
constructor(private minInterval: number) {
super();
}
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
const now = Date.now();
const elapsed = now - this.lastCall;
if (elapsed < this.minInterval) {
// Delay the execution
return timer(this.minInterval - elapsed).pipe(
switchMap(() => {
this.lastCall = Date.now();
return this.runNext(input, next);
}),
);
}
this.lastCall = now;
return this.runNext(input, next);
}
}
// Limit to one request per second
agent.use(new RateLimitMiddleware(1000));
Other common patterns include retry logic and response caching.
Multiple middleware can be combined to create sophisticated processing pipelines.
const logger = loggingMiddleware;
const auth = new AuthMiddleware(apiKey);
const filter = new FilterToolCallsMiddleware({ allowedToolCalls: ["search"] });
agent.use(logger, auth, filter);
// Execution flow:
// logger → auth → filter → agent → filter → auth → logger
const debugMiddleware: MiddlewareFunction = (input, next) => {
const isDebug = input.forwardedProps?.debug === true;
if (!isDebug) {
return next.run(input);
}
return next.run(input).pipe(
tap((event) => {
console.debug("[DEBUG]", JSON.stringify(event, null, 2));
}),
);
};
Middleware added with agent.use(...) runs in runAgent() (and the legacy bridge path). connectAgent() currently calls connect() directly and does not run middleware.
The middleware system is fully typed for excellent IDE support:
AbstractAgent,
BaseEvent,
MiddlewareFunction,
RunAgentInput
} from "@ag-ui/client"
// Type-safe middleware function
const typedMiddleware: MiddlewareFunction = (
input: RunAgentInput,
next: AbstractAgent
): Observable<BaseEvent> => {
return next.run(input)
}