packages/docs/runtime/events.mdx
Your agent does a lot behind the scenes. Events let you hook into every step:
The event system enables reactive programming patterns, allowing plugins and services to respond to runtime activities. Events flow through the system providing hooks for custom logic and integrations.
flowchart TD
Source[Event Source] --> Emit[Runtime.emit]
Emit --> Queue[Event Queue]
Queue --> Handlers[Event Handlers]
Handlers --> H1[Handler 1]
Handlers --> H2[Handler 2]
Handlers --> H3[Handler 3]
H1 --> Process[Process Event]
H2 --> Process
H3 --> Process
classDef source fill:#2196f3,color:#fff
classDef system fill:#9c27b0,color:#fff
classDef handlers fill:#4caf50,color:#fff
classDef processing fill:#ff9800,color:#fff
class Source source
class Emit,Queue system
class Handlers,H1,H2,H3 handlers
class Process processing
enum EventType {
// World events
WORLD_JOINED = "world:joined",
WORLD_CONNECTED = "world:connected",
WORLD_LEFT = "world:left",
// Entity events
ENTITY_JOINED = "entity:joined",
ENTITY_LEFT = "entity:left",
ENTITY_UPDATED = "entity:updated",
// Room events
ROOM_JOINED = "room:joined",
ROOM_LEFT = "room:left",
ROOM_UPDATED = "room:updated",
// Message events
MESSAGE_RECEIVED = "message:received",
MESSAGE_SENT = "message:sent",
MESSAGE_DELETED = "message:deleted",
MESSAGE_UPDATED = "message:updated",
// Voice events
VOICE_MESSAGE_RECEIVED = "voice:message:received",
VOICE_MESSAGE_SENT = "voice:message:sent",
VOICE_STARTED = "voice:started",
VOICE_ENDED = "voice:ended",
// Run events
RUN_STARTED = "run:started",
RUN_COMPLETED = "run:completed",
RUN_FAILED = "run:failed",
RUN_TIMEOUT = "run:timeout",
// Action events
ACTION_STARTED = "action:started",
ACTION_COMPLETED = "action:completed",
ACTION_FAILED = "action:failed",
// Evaluator events
EVALUATOR_STARTED = "evaluator:started",
EVALUATOR_COMPLETED = "evaluator:completed",
EVALUATOR_FAILED = "evaluator:failed",
// Model events
MODEL_USED = "model:used",
MODEL_FAILED = "model:failed",
// Service events
SERVICE_STARTED = "service:started",
SERVICE_STOPPED = "service:stopped",
SERVICE_ERROR = "service:error",
}
Each event type has a specific payload structure:
// Message event payload
interface MessagePayload {
runtime: IAgentRuntime;
message: Memory;
room?: Room;
user?: User;
callback?: ResponseCallback;
}
// World event payload
interface WorldPayload {
runtime: IAgentRuntime;
world: World;
metadata?: Record<string, unknown>;
}
// Entity event payload
interface EntityPayload {
runtime: IAgentRuntime;
entity: Entity;
action: "joined" | "left" | "updated";
changes?: Partial<Entity>;
}
// Action event payload
interface ActionPayload {
runtime: IAgentRuntime;
action: Action;
message: Memory;
state: State;
result?: ActionResult;
error?: Error;
}
// Model event payload
interface ModelPayload {
runtime: IAgentRuntime;
modelType: ModelTypeName;
provider: string;
params: Record<string, unknown>;
result?: unknown;
error?: Error;
duration: number;
}
Event handlers are registered during plugin initialization:
const myPlugin: Plugin = {
name: "my-plugin",
events: {
[EventType.MESSAGE_RECEIVED]: [handleMessageReceived, logMessage],
[EventType.ACTION_COMPLETED]: [processActionResult],
[EventType.RUN_COMPLETED]: [cleanupRun],
},
};
// Message handler
async function handleMessageReceived(payload: MessagePayload) {
const { runtime, message, room, user, callback } = payload;
// Process the message
const state = await runtime.composeState(message);
// Check if we should respond
if (shouldRespond(message, state)) {
await runtime.processActions(message, [], state);
}
// Call callback if provided
if (callback) {
await callback({
text: "Message processed",
metadata: { processed: true },
});
}
}
// Action handler
async function processActionResult(payload: ActionPayload) {
const { runtime, action, result, error } = payload;
if (error) {
runtime.logger.error(`Action ${action.name} failed:`, error);
// Handle error
return;
}
// Process successful result
runtime.logger.info(`Action ${action.name} completed:`, result);
// Store result in memory
await runtime.createMemory({
type: MemoryType.ACTION,
content: {
text: `Action ${action.name} completed`,
action: action.name,
result,
},
roomId: payload.message.roomId,
});
}
// Emit an event from the runtime
await runtime.emit(EventType.MESSAGE_RECEIVED, {
runtime,
message,
room,
user,
callback,
});
// Emit from a service
class CustomService extends Service {
async processData(data: Record<string, unknown>) {
await this.runtime.emit(EventType.CUSTOM_EVENT, {
runtime: this.runtime,
data,
timestamp: Date.now(),
});
}
}
// Emit from an action
const customAction: Action = {
name: "CUSTOM_ACTION",
handler: async (runtime, message, state) => {
// Do work
const result = await performAction();
// Emit completion event
await runtime.emit(EventType.ACTION_COMPLETED, {
runtime,
action: customAction,
message,
state,
result,
});
return result;
},
};
// Add listener to runtime
runtime.on(EventType.MESSAGE_RECEIVED, async (payload) => {
console.log("Message received:", payload.message.content.text);
});
// Add multiple listeners
runtime.on(EventType.ACTION_STARTED, logActionStart);
runtime.on(EventType.ACTION_STARTED, trackActionMetrics);
runtime.on(EventType.ACTION_STARTED, notifyActionStart);
// One-time listener
runtime.once(EventType.SERVICE_STARTED, async (payload) => {
console.log("Service started:", payload.service.name);
});
// Remove specific listener
runtime.off(EventType.MESSAGE_RECEIVED, messageHandler);
// Remove all listeners for an event
runtime.removeAllListeners(EventType.MESSAGE_RECEIVED);
// Remove all listeners
runtime.removeAllListeners();
// Emit event and wait for response
async function requestWithResponse(runtime: IAgentRuntime, data: Record<string, unknown>) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Response timeout"));
}, 5000);
// Listen for response
runtime.once(EventType.RESPONSE_RECEIVED, (payload) => {
clearTimeout(timeout);
resolve(payload.response);
});
// Emit request
runtime.emit(EventType.REQUEST_SENT, {
runtime,
data,
requestId: generateId(),
});
});
}
// Chain events for complex workflows
const workflowPlugin: Plugin = {
name: "workflow",
events: {
[EventType.MESSAGE_RECEIVED]: [
async (payload) => {
// Step 1: Process message
const processed = await processMessage(payload);
// Emit next event
await payload.runtime.emit(EventType.MESSAGE_PROCESSED, {
...payload,
processed,
});
},
],
[EventType.MESSAGE_PROCESSED]: [
async (payload) => {
// Step 2: Generate response
const response = await generateResponse(payload);
// Emit next event
await payload.runtime.emit(EventType.RESPONSE_GENERATED, {
...payload,
response,
});
},
],
[EventType.RESPONSE_GENERATED]: [
async (payload) => {
// Step 3: Send response
await sendResponse(payload);
},
],
},
};
// Aggregate multiple events
class EventAggregator {
private events: Map<string, EventHandler[]> = new Map();
private flushInterval: NodeJS.Timer;
constructor(private runtime: IAgentRuntime) {
// Listen for events to aggregate
runtime.on(EventType.MODEL_USED, this.aggregate.bind(this));
// Flush periodically
this.flushInterval = setInterval(() => this.flush(), 60000);
}
private aggregate(payload: ModelPayload) {
const key = `${payload.modelType}:${payload.provider}`;
if (!this.events.has(key)) {
this.events.set(key, []);
}
this.events.get(key).push({
timestamp: Date.now(),
duration: payload.duration,
params: payload.params,
});
}
private async flush() {
for (const [key, events] of this.events.entries()) {
const [modelType, provider] = key.split(":");
// Calculate metrics
const metrics = {
count: events.length,
avgDuration:
events.reduce((sum, e) => sum + e.duration, 0) / events.length,
totalDuration: events.reduce((sum, e) => sum + e.duration, 0),
};
// Emit aggregated event
await this.runtime.emit(EventType.METRICS_AGGREGATED, {
runtime: this.runtime,
modelType,
provider,
metrics,
period: 60000,
});
}
// Clear events
this.events.clear();
}
stop() {
clearInterval(this.flushInterval);
}
}
// Extend EventType with custom events
declare module "@elizaos/core" {
interface EventTypeRegistry {
CUSTOM_DATA_RECEIVED: "custom:data:received";
CUSTOM_PROCESS_COMPLETE: "custom:process:complete";
CUSTOM_ERROR_OCCURRED: "custom:error:occurred";
}
}
// Define custom payload
interface CustomDataPayload {
runtime: IAgentRuntime;
data: Record<string, unknown>;
source: string;
timestamp: number;
}
const customPlugin: Plugin = {
name: "custom-plugin",
events: {
"custom:data:received": [
async (payload: CustomDataPayload) => {
// Process custom data
const processed = await processCustomData(payload.data);
// Emit completion
await payload.runtime.emit("custom:process:complete", {
runtime: payload.runtime,
original: payload.data,
processed,
duration: Date.now() - payload.timestamp,
});
},
],
},
actions: [
{
name: "RECEIVE_DATA",
handler: async (runtime, message, state) => {
// Emit custom event
await runtime.emit("custom:data:received", {
runtime,
data: message.content,
source: "user",
timestamp: Date.now(),
});
},
},
],
};
// Middleware to log all events
function loggingMiddleware(eventType: EventType, payload: EventPayload) {
console.log(`[Event] ${eventType}:`, {
timestamp: new Date().toISOString(),
type: eventType,
payload: JSON.stringify(payload).slice(0, 200),
});
}
// Middleware to filter events
function filterMiddleware(allowedEvents: EventType[]) {
return (eventType: EventType, payload: EventPayload, next: () => void) => {
if (allowedEvents.includes(eventType)) {
next();
}
};
}
// Middleware to transform payload
function transformMiddleware(
eventType: EventType,
payload: EventPayload,
next: (transformed: EventPayload) => void,
) {
const transformed = {
...payload,
timestamp: Date.now(),
eventType,
};
next(transformed);
}
// Global error handler for events
runtime.on("error", (error: Error, eventType: EventType, payload: EventPayload) => {
console.error(`Error in event ${eventType}:`, error);
// Log to monitoring service
monitoringService.logError({
error: error.message,
stack: error.stack,
eventType,
payload: JSON.stringify(payload).slice(0, 1000),
});
});
// Handler with error handling
async function safeEventHandler(payload: MessagePayload) {
try {
await riskyOperation(payload);
} catch (error) {
// Emit error event
await payload.runtime.emit(EventType.SERVICE_ERROR, {
runtime: payload.runtime,
error,
originalEvent: payload,
});
// Don't throw - allow other handlers to run
}
}
class EventBatcher {
private batch: Map<EventType, EventPayload[]> = new Map();
private batchSize = 100;
private flushInterval = 1000;
private timer: NodeJS.Timer;
constructor(private runtime: IAgentRuntime) {
this.timer = setInterval(() => this.flush(), this.flushInterval);
}
add(eventType: EventType, payload: EventPayload) {
if (!this.batch.has(eventType)) {
this.batch.set(eventType, []);
}
const events = this.batch.get(eventType);
events.push(payload);
if (events.length >= this.batchSize) {
this.flushType(eventType);
}
}
private flushType(eventType: EventType) {
const events = this.batch.get(eventType);
if (!events || events.length === 0) return;
// Emit batch event
this.runtime.emit(`${eventType}:batch` as EventType, {
runtime: this.runtime,
events,
count: events.length,
});
this.batch.set(eventType, []);
}
flush() {
for (const eventType of this.batch.keys()) {
this.flushType(eventType);
}
}
stop() {
clearInterval(this.timer);
this.flush();
}
}
// Throttle high-frequency events
function throttleEvents(eventType: EventType, delay: number) {
let lastEmit = 0;
let pending: EventPayload | null = null;
let timer: NodeJS.Timeout | null = null;
return (payload: EventPayload) => {
const now = Date.now();
if (now - lastEmit >= delay) {
// Emit immediately
runtime.emit(eventType, payload);
lastEmit = now;
} else {
// Store for later
pending = payload;
// Schedule emit
if (!timer) {
timer = setTimeout(
() => {
if (pending) {
runtime.emit(eventType, pending);
lastEmit = Date.now();
pending = null;
}
timer = null;
},
delay - (now - lastEmit),
);
}
}
};
}