multimodal/websites/tarko/docs/en/guide/advanced/agent-protocol.mdx
Tarko's Agent Protocol defines standardized formats for agent communication, enabling seamless integration between components and making Context Engineering more manageable.
The Agent Protocol consists of two main components:
The Event Stream is the core communication mechanism within Tarko agents.
All events follow a consistent structure:
interface AgentEvent {
id: string; // Unique event identifier
type: string; // Event type
timestamp: number; // Unix timestamp
sessionId: string; // Session identifier
data: any; // Event-specific data
metadata?: { // Optional metadata
source?: string; // Event source component
version?: string; // Protocol version
traceId?: string; // Distributed tracing ID
};
}
// User message
{
type: 'user_message',
data: {
content: string | MessageContent[]; // Text or multimodal content
role: 'user';
}
}
// User query (alias for user_message)
{
type: 'user_query',
data: {
query: string | MessageContent[];
}
}
// Assistant message start
{
type: 'assistant_message_start',
data: {
role: 'assistant';
}
}
// Assistant message delta (streaming)
{
type: 'assistant_message_delta',
data: {
delta: string;
accumulated?: string; // Full content so far
}
}
// Assistant message complete
{
type: 'assistant_message',
data: {
content: string;
role: 'assistant';
finishReason: 'stop' | 'length' | 'tool_calls';
}
}
// Tool call initiated
{
type: 'tool_call',
data: {
id: string;
name: string;
arguments: Record<string, any>;
type: 'function';
}
}
// Tool execution start
{
type: 'tool_execution_start',
data: {
toolCallId: string;
name: string;
}
}
// Tool execution delta (streaming tools)
{
type: 'tool_execution_delta',
data: {
toolCallId: string;
delta: string;
accumulated?: string;
}
}
// Tool execution result
{
type: 'tool_result',
data: {
toolCallId: string;
name: string;
content: string;
success: boolean;
error?: string;
}
}
// Session started
{
type: 'session_start',
data: {
sessionId: string;
agentName: string;
timestamp: number;
}
}
// Session ended
{
type: 'session_end',
data: {
sessionId: string;
duration: number;
messageCount: number;
}
}
// Error occurred
{
type: 'error',
data: {
error: string;
code?: string;
details?: any;
recoverable: boolean;
}
}
// Context compression
{
type: 'context_compressed',
data: {
originalLength: number;
compressedLength: number;
compressionRatio: number;
strategy: string;
}
}
import { Agent } from '@tarko/agent';
const agent = new Agent({
// ... configuration
});
// Listen to specific event types
agent.on('assistant_message', (event) => {
console.log('Assistant response:', event.data.content);
});
agent.on('tool_call', (event) => {
console.log('Tool called:', event.data.name, event.data.arguments);
});
agent.on('error', (event) => {
console.error('Agent error:', event.data.error);
});
// Listen to all events
agent.on('*', (event) => {
console.log('Event:', event.type, event.data);
});
// Emit custom events from tools or hooks
agent.emit({
type: 'custom_metric',
data: {
metric: 'response_time',
value: 1250,
unit: 'ms'
}
});
// Filter events by type
const toolEvents = agent.getEventStream()
.filter(event => event.type.startsWith('tool_'));
// Filter events by session
const sessionEvents = agent.getEventStream()
.filter(event => event.sessionId === 'specific-session');
// Filter events by time range
const recentEvents = agent.getEventStream()
.filter(event => event.timestamp > Date.now() - 3600000); // Last hour
The Server Protocol defines HTTP/SSE/WebSocket APIs for external integration.
# Create session
POST /api/v1/sessions/create
Content-Type: application/json
{
"name": "My Session",
"tags": ["tag1", "tag2"]
}
# Response
{
"sessionId": "sess_123",
"createdAt": 1622548800000
}
# Execute query
POST /api/v1/sessions/query
Content-Type: application/json
{
"sessionId": "sess_123",
"query": "Hello, how can you help?"
}
# Response
{
"response": "Hello! I'm here to help...",
"events": [
{
"type": "user_message",
"data": { "content": "Hello, how can you help?" }
},
{
"type": "assistant_message",
"data": { "content": "Hello! I'm here to help..." }
}
]
}
# Get session events
GET /api/v1/sessions/events?sessionId=sess_123&limit=50&offset=0
# Response
{
"events": [
{
"id": "evt_1",
"type": "user_message",
"timestamp": 1622548800000,
"sessionId": "sess_123",
"data": { "content": "Hello" }
}
],
"total": 125,
"hasMore": true
}
For real-time streaming:
# Streaming query
POST /api/v1/sessions/query/stream
Content-Type: application/json
Accept: text/event-stream
{
"sessionId": "sess_123",
"query": "Tell me a story"
}
SSE Response:
event: user_message
data: {"id":"evt_1","type":"user_message","data":{"content":"Tell me a story"}}
event: assistant_message_start
data: {"id":"evt_2","type":"assistant_message_start","data":{}}
event: assistant_message_delta
data: {"id":"evt_3","type":"assistant_message_delta","data":{"delta":"Once upon"}}
event: assistant_message_delta
data: {"id":"evt_4","type":"assistant_message_delta","data":{"delta":" a time"}}
event: assistant_message
data: {"id":"evt_5","type":"assistant_message","data":{"content":"Once upon a time..."}}
For bidirectional real-time communication:
const socket = new WebSocket('ws://localhost:8888/ws');
// Connect to session
socket.send(JSON.stringify({
type: 'join_session',
sessionId: 'sess_123'
}));
// Send query
socket.send(JSON.stringify({
type: 'send_query',
sessionId: 'sess_123',
query: 'Hello!'
}));
// Receive events
socket.onmessage = (event) => {
const agentEvent = JSON.parse(event.data);
console.log('Received:', agentEvent.type, agentEvent.data);
};
// Abort query
socket.send(JSON.stringify({
type: 'abort_query',
sessionId: 'sess_123'
}));
Define custom events for your application:
// Define custom event types
interface CustomEvents {
'user_feedback': {
rating: number;
comment?: string;
};
'tool_performance': {
toolName: string;
executionTime: number;
success: boolean;
};
}
// Use with type safety
agent.on('user_feedback', (event) => {
console.log('User rating:', event.data.rating);
});
agent.emit({
type: 'tool_performance',
data: {
toolName: 'web_search',
executionTime: 1250,
success: true
}
});
// Specify protocol version
const agent = new Agent({
protocol: {
version: '1.0',
extensions: ['custom-events', 'performance-metrics']
}
});
// Version-aware event handling
agent.on('*', (event) => {
const version = event.metadata?.version || '1.0';
if (version === '1.0') {
// Handle v1.0 events
} else if (version === '2.0') {
// Handle v2.0 events
}
});
import { useEffect, useState } from 'react';
function AgentChat({ sessionId }: { sessionId: string }) {
const [events, setEvents] = useState<AgentEvent[]>([]);
const [isLoading, setIsLoading] = useState(false);
useEffect(() => {
const eventSource = new EventSource(
`/api/v1/sessions/events/stream?sessionId=${sessionId}`
);
eventSource.onmessage = (event) => {
const agentEvent = JSON.parse(event.data);
setEvents(prev => [...prev, agentEvent]);
if (agentEvent.type === 'assistant_message') {
setIsLoading(false);
}
};
return () => eventSource.close();
}, [sessionId]);
const sendMessage = async (message: string) => {
setIsLoading(true);
await fetch('/api/v1/sessions/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sessionId, query: message })
});
};
return (
<div>
{events.map(event => (
<EventDisplay key={event.id} event={event} />
))}
{isLoading && <LoadingIndicator />}
<MessageInput onSend={sendMessage} />
</div>
);
}
import { Agent } from '@tarko/agent';
import chalk from 'chalk';
const agent = new Agent({ /* config */ });
// Display events in CLI
agent.on('user_message', (event) => {
console.log(chalk.blue('User:'), event.data.content);
});
agent.on('assistant_message_delta', (event) => {
process.stdout.write(chalk.green(event.data.delta));
});
agent.on('assistant_message', (event) => {
console.log(); // New line after complete message
});
agent.on('tool_call', (event) => {
console.log(chalk.yellow(`🔧 Using ${event.data.name}...`));
});
agent.on('tool_result', (event) => {
if (event.data.success) {
console.log(chalk.green('✅ Tool completed'));
} else {
console.log(chalk.red('❌ Tool failed:'), event.data.error);
}
});
agent.on('error', (event) => {
console.error(chalk.red('Error:'), event.data.error);
});
import { Agent } from '@tarko/agent';
import { createPrometheusMetrics } from './metrics';
const metrics = createPrometheusMetrics();
const agent = new Agent({ /* config */ });
// Track metrics from events
agent.on('assistant_message', (event) => {
metrics.responseCount.inc();
metrics.responseLength.observe(event.data.content.length);
});
agent.on('tool_call', (event) => {
metrics.toolCallCount.inc({ tool: event.data.name });
});
agent.on('tool_result', (event) => {
const duration = Date.now() - event.timestamp;
metrics.toolExecutionDuration.observe(
{ tool: event.data.name, success: event.data.success },
duration
);
});
agent.on('error', (event) => {
metrics.errorCount.inc({ type: event.data.code || 'unknown' });
});
// Enable debug logging
const agent = new Agent({
debug: {
events: true,
level: 'verbose'
}
});
// Export event stream for analysis
const events = agent.getEventHistory();
fs.writeFileSync('events.json', JSON.stringify(events, null, 2));
// Real-time event monitoring
agent.on('*', (event) => {
if (process.env.NODE_ENV === 'development') {
console.log(`[${event.type}]`, event.data);
}
});
import { validateEvent, EventSchema } from '@tarko/agent';
// Validate events against schema
agent.on('*', (event) => {
const validation = validateEvent(event, EventSchema);
if (!validation.valid) {
console.warn('Invalid event:', validation.errors);
}
});