skills/copilotkit-agui/references/client-sdk.md
API reference for the AG-UI client SDK (@ag-ui/client).
The client re-exports everything from @ag-ui/core, so you typically only need one import:
import {
// Agent classes
AbstractAgent,
HttpAgent,
// Types from @ag-ui/core
EventType,
BaseEvent,
RunAgentInput,
Message,
// Middleware
Middleware,
FilterToolCallsMiddleware,
// Event application
defaultApplyEvents,
// Verification
verifyEvents,
// Transforms
transformChunks,
transformHttpEventStream,
// Compact utilities
compactEvents,
} from "@ag-ui/client";
Base class for all AG-UI agents. Manages conversation state, message history, event processing, and subscriber notification.
interface AgentConfig {
agentId?: string; // Unique agent identifier
description?: string; // Human-readable description
threadId?: string; // Conversation thread ID (auto-generated if omitted)
initialMessages?: Message[]; // Starting message history
initialState?: State; // Starting state object
debug?: boolean; // Enable debug logging
}
const agent = new MyAgent({
agentId: "my-agent",
threadId: "thread-1",
initialMessages: [{ id: "1", role: "user", content: "Hello" }],
initialState: { preference: "dark" },
debug: true,
});
| Property | Type | Description |
|---|---|---|
agentId | string? | Agent identifier |
description | string | Human-readable description |
threadId | string | Conversation thread ID |
messages | Message[] | Current message history |
state | State | Current agent state |
debug | boolean | Debug logging enabled |
isRunning | boolean | Whether a run is currently active |
subscribers | AgentSubscriber[] | Registered event subscribers |
run()Must be implemented by subclasses. Returns an RxJS Observable of AG-UI events.
abstract run(input: RunAgentInput): Observable<BaseEvent>;
runAgent(parameters?, subscriber?)Executes a full agent run with event application, state management, and subscriber notification.
interface RunAgentParameters {
runId?: string;
tools?: Tool[];
context?: Context[];
forwardedProps?: any;
}
interface RunAgentResult {
result: any; // From RUN_FINISHED.result
newMessages: Message[]; // Messages added during this run
}
const { result, newMessages } = await agent.runAgent({
runId: "run-1",
tools: [{ name: "search", description: "Search docs", parameters: {} }],
context: [{ description: "Current page", value: "/dashboard" }],
forwardedProps: { model: "gpt-4" },
});
The pipeline internally:
RunAgentInput from current state + parametersrun(input) to get the event ObservabletransformChunks)verifyEvents)defaultApplyEvents)connectAgent(parameters?, subscriber?)Like runAgent() but calls the protected connect() method instead of run(). Used for persistent connections (WebSocket).
detachActiveRun()Immediately stops processing the current run's event stream. The run's Observable is unsubscribed and the finalize handler runs.
await agent.detachActiveRun();
abortRun()Aborts the current run. For HttpAgent, this calls AbortController.abort().
subscribe(subscriber)Registers an event subscriber. Returns an object with unsubscribe().
const subscription = agent.subscribe({
onTextMessageContentEvent: ({ event, textMessageBuffer }) => {
console.log("Streaming:", textMessageBuffer + event.delta);
},
onRunFinishedEvent: ({ result }) => {
console.log("Done:", result);
},
});
// Later:
subscription.unsubscribe();
use(...middlewares)Adds middleware to the agent's processing pipeline. Middlewares run in order, wrapping the run() call.
agent.use(new FilterToolCallsMiddleware(["allowedTool"]));
agent.use((input, next) => {
// Modify input before passing to next
return next.run(input);
});
addMessage(message) / addMessages(messages)Adds messages and notifies subscribers (onNewMessage, onNewToolCall, onMessagesChanged).
setMessages(messages) / setState(state)Replaces messages/state and notifies subscribers.
clone()Creates a deep copy of the agent with the same configuration, messages, state, and middleware.
getCapabilities()Optional method that subclasses can implement to advertise supported capabilities:
async getCapabilities(): Promise<AgentCapabilities> {
return {
identity: { name: "My Agent", type: "custom", version: "1.0.0" },
transport: { streaming: true },
tools: { supported: true, clientProvided: true },
state: { snapshots: true, deltas: true },
humanInTheLoop: { supported: true, approvals: true },
};
}
Concrete agent that connects to a remote HTTP endpoint. Extends AbstractAgent.
interface HttpAgentConfig extends AgentConfig {
url: string; // Agent endpoint URL
headers?: Record<string, string>; // Custom HTTP headers
}
const agent = new HttpAgent({
url: "https://api.example.com/agent",
headers: {
Authorization: "Bearer sk-...",
"X-Custom-Header": "value",
},
threadId: "thread-1",
});
run() sends a POST request to url with RunAgentInput as JSON bodyContent-Type: application/json and Accept: text/event-streamdata: line is parsed through EventSchemas (Zod discriminated union)| Property | Type | Description |
|---|---|---|
url | string | Agent endpoint URL |
headers | Record<string, string> | Custom request headers |
abortController | AbortController | Controls request cancellation |
requestInit(input)Protected method that builds the RequestInit for fetch(). Override for custom request behavior:
class CustomHttpAgent extends HttpAgent {
protected requestInit(input: RunAgentInput): RequestInit {
return {
method: "POST",
headers: {
...this.headers,
"Content-Type": "application/json",
Accept: "text/event-stream",
"X-Request-Id": input.runId,
},
body: JSON.stringify(input),
signal: this.abortController.signal,
};
}
}
abortRun()Aborts the HTTP request via AbortController.abort(). The client auto-generates a RUN_ERROR event with code: "abort".
Interface for receiving typed event callbacks during agent runs. All callbacks are optional and can be sync or async.
interface AgentSubscriber {
// Before events start flowing
onRunInitialized?(
params: AgentSubscriberParams,
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>;
// On unrecoverable error
onRunFailed?(
params: { error: Error } & AgentSubscriberParams,
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>;
// After run completes (success or failure)
onRunFinalized?(
params: AgentSubscriberParams,
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>;
}
Each event type has a corresponding callback. Key ones:
interface AgentSubscriber {
// Catch-all for every event
onEvent?(params: { event: BaseEvent } & AgentSubscriberParams):
MaybePromise<AgentStateMutation | void>;
// Lifecycle events
onRunStartedEvent?(params: { event: RunStartedEvent } & ...): ...;
onRunFinishedEvent?(params: { event: RunFinishedEvent; result?: any } & ...): ...;
onRunErrorEvent?(params: { event: RunErrorEvent } & ...): ...;
onStepStartedEvent?(params: { event: StepStartedEvent } & ...): ...;
onStepFinishedEvent?(params: { event: StepFinishedEvent } & ...): ...;
// Text message events (includes accumulated buffer)
onTextMessageStartEvent?(params: { event: TextMessageStartEvent } & ...): ...;
onTextMessageContentEvent?(params: {
event: TextMessageContentEvent;
textMessageBuffer: string; // Content accumulated so far
} & ...): ...;
onTextMessageEndEvent?(params: {
event: TextMessageEndEvent;
textMessageBuffer: string; // Complete message content
} & ...): ...;
// Tool call events (includes accumulated args)
onToolCallStartEvent?(params: { event: ToolCallStartEvent } & ...): ...;
onToolCallArgsEvent?(params: {
event: ToolCallArgsEvent;
toolCallBuffer: string; // Raw args accumulated
toolCallName: string; // Tool name
partialToolCallArgs: Record<string, any>; // Best-effort parsed args
} & ...): ...;
onToolCallEndEvent?(params: {
event: ToolCallEndEvent;
toolCallName: string;
toolCallArgs: Record<string, any>; // Fully parsed args
} & ...): ...;
onToolCallResultEvent?(params: { event: ToolCallResultEvent } & ...): ...;
// State events
onStateSnapshotEvent?(params: { event: StateSnapshotEvent } & ...): ...;
onStateDeltaEvent?(params: { event: StateDeltaEvent } & ...): ...;
onMessagesSnapshotEvent?(params: { event: MessagesSnapshotEvent } & ...): ...;
// Activity events
onActivitySnapshotEvent?(params: {
event: ActivitySnapshotEvent;
activityMessage?: ActivityMessage;
existingMessage?: Message;
} & ...): ...;
onActivityDeltaEvent?(params: {
event: ActivityDeltaEvent;
activityMessage?: ActivityMessage;
} & ...): ...;
// Reasoning events
onReasoningStartEvent?(params: { event: ReasoningStartEvent } & ...): ...;
onReasoningMessageContentEvent?(params: {
event: ReasoningMessageContentEvent;
reasoningMessageBuffer: string;
} & ...): ...;
onReasoningEndEvent?(params: { event: ReasoningEndEvent } & ...): ...;
onReasoningEncryptedValueEvent?(params: { event: ReasoningEncryptedValueEvent } & ...): ...;
// Custom/raw events
onRawEvent?(params: { event: RawEvent } & ...): ...;
onCustomEvent?(params: { event: CustomEvent } & ...): ...;
// State change notifications (fires after state/messages update)
onMessagesChanged?(params: Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): ...;
onStateChanged?(params: Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): ...;
onNewMessage?(params: { message: Message } & ...): ...;
onNewToolCall?(params: { toolCall: ToolCall } & ...): ...;
}
Subscriber callbacks can return mutations to modify agent state:
interface AgentStateMutation {
messages?: Message[]; // Replace messages
state?: State; // Replace state
stopPropagation?: boolean; // Stop processing this event
}
If stopPropagation is true, the default event application logic is skipped and no further subscribers see the event.
Middleware intercepts the run() call, enabling event transformation, filtering, and augmentation.
abstract class Middleware {
// Override this to intercept runs
abstract run(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
// Helper: runs next agent with chunk transformation
protected runNext(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
// Helper: runs next agent and tracks state after each event
protected runNextWithState(
input: RunAgentInput,
next: AbstractAgent,
): Observable<EventWithState>;
}
interface EventWithState {
event: BaseEvent;
messages: Message[]; // State AFTER event applied
state: any; // State AFTER event applied
}
Use a plain function instead of a class:
agent.use((input: RunAgentInput, next: AbstractAgent) => {
// Modify input
const modifiedInput = {
...input,
forwardedProps: { ...input.forwardedProps, custom: true },
};
// Pass to next agent/middleware
return next.run(modifiedInput);
});
Built-in middleware that filters tool call events to only allowed tool names:
import { FilterToolCallsMiddleware } from "@ag-ui/client";
agent.use(new FilterToolCallsMiddleware(["allowedTool1", "allowedTool2"]));
import { Middleware } from "@ag-ui/client";
import { map } from "rxjs/operators";
class LoggingMiddleware extends Middleware {
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
console.log("Run started with", input.messages.length, "messages");
return this.runNext(input, next).pipe(
map((event) => {
console.log("Event:", event.type);
return event;
}),
);
}
}
class ConditionalMiddleware extends Middleware {
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
return this.runNextWithState(input, next).pipe(
map(({ event, messages, state }) => {
// Access messages and state AFTER the event was applied
console.log("Messages after event:", messages.length);
console.log("State after event:", state);
return event;
}),
);
}
}
The defaultApplyEvents function processes events and updates agent messages/state:
function defaultApplyEvents(
input: RunAgentInput,
events$: Observable<BaseEvent>,
agent: AbstractAgent,
subscribers: AgentSubscriber[],
): Observable<AgentStateMutation>;
| Event | Action |
|---|---|
TEXT_MESSAGE_START | Creates new message in messages array |
TEXT_MESSAGE_CONTENT | Appends delta to message content |
TEXT_MESSAGE_END | Fires onNewMessage subscriber |
TOOL_CALL_START | Creates assistant message with toolCalls array (or adds to existing if parentMessageId matches) |
TOOL_CALL_ARGS | Appends delta to tool call's function.arguments |
TOOL_CALL_END | Fires onNewToolCall subscriber |
TOOL_CALL_RESULT | Adds tool message to messages |
STATE_SNAPSHOT | Replaces entire state |
STATE_DELTA | Applies JSON Patch operations to state |
MESSAGES_SNAPSHOT | Edit-based merge preserving activity messages |
ACTIVITY_SNAPSHOT | Creates or replaces activity message |
ACTIVITY_DELTA | Applies JSON Patch to activity content |
RUN_STARTED | Adds input.messages if present (new messages only) |
REASONING_MESSAGE_START | Creates reasoning message |
REASONING_MESSAGE_CONTENT | Appends delta to reasoning message |
REASONING_ENCRYPTED_VALUE | Sets encryptedValue on target message or tool call |
AG-UI uses RxJS Observables throughout. Key patterns:
import { Observable } from "rxjs";
import { BaseEvent, EventType } from "@ag-ui/core";
// From scratch
const events$ = new Observable<BaseEvent>((observer) => {
observer.next({ type: EventType.RUN_STARTED, threadId: "t1", runId: "r1" });
observer.next({
type: EventType.TEXT_MESSAGE_START,
messageId: "m1",
role: "assistant",
});
observer.next({
type: EventType.TEXT_MESSAGE_CONTENT,
messageId: "m1",
delta: "Hello",
});
observer.next({ type: EventType.TEXT_MESSAGE_END, messageId: "m1" });
observer.next({ type: EventType.RUN_FINISHED, threadId: "t1", runId: "r1" });
observer.complete();
});
const events$ = new Observable<BaseEvent>((observer) => {
(async () => {
try {
observer.next({
type: EventType.RUN_STARTED,
threadId: "t1",
runId: "r1",
});
for await (const chunk of llmStream) {
observer.next({
type: EventType.TEXT_MESSAGE_CONTENT,
messageId: "m1",
delta: chunk,
});
}
observer.next({
type: EventType.RUN_FINISHED,
threadId: "t1",
runId: "r1",
});
observer.complete();
} catch (error) {
observer.next({
type: EventType.RUN_ERROR,
message: error.message,
});
observer.complete();
}
})();
});
HttpAgent.run() calls runHttpRequest(url, requestInit) which returns Observable<HttpEvent>HttpEvent is either HttpHeadersEvent (status + headers) or HttpDataEvent (Uint8Array chunks)transformHttpEventStream() examines the content-type header:
application/x-ag-ui -> protobuf parserparseSSEStream)\n\n, extracts data: lines, parses JSONEventSchemas.parse() (Zod discriminated union)AbortError (from AbortController) is converted to RUN_ERROR with code: "abort"The client automatically applies backward-compatibility middleware:
THINKING_* events to REASONING_* events