Back to UI-TARS-desktop

Agent Protocol

multimodal/websites/tarko/docs/en/guide/advanced/agent-protocol.mdx

0.3.012.3 KB
Original Source

Agent Protocol

Tarko's Agent Protocol defines standardized formats for agent communication, enabling seamless integration between components and making Context Engineering more manageable.

Overview

The Agent Protocol consists of two main components:

  1. Event Stream: Internal communication between agent components
  2. Server Protocol: HTTP/SSE/WebSocket APIs for external integration

Event Stream Protocol

The Event Stream is the core communication mechanism within Tarko agents.

Event Structure

All events follow a consistent structure:

typescript
interface AgentEvent {
  id: string;           // Unique event identifier
  type: string;         // Event type
  timestamp: number;    // Unix timestamp
  sessionId: string;    // Session identifier
  data: any;           // Event-specific data
  metadata?: {         // Optional metadata
    source?: string;   // Event source component
    version?: string;  // Protocol version
    traceId?: string;  // Distributed tracing ID
  };
}

Core Event Types

User Events

typescript
// User message
{
  type: 'user_message',
  data: {
    content: string | MessageContent[]; // Text or multimodal content
    role: 'user';
  }
}

// User query (alias for user_message)
{
  type: 'user_query',
  data: {
    query: string | MessageContent[];
  }
}

Assistant Events

typescript
// Assistant message start
{
  type: 'assistant_message_start',
  data: {
    role: 'assistant';
  }
}

// Assistant message delta (streaming)
{
  type: 'assistant_message_delta',
  data: {
    delta: string;
    accumulated?: string; // Full content so far
  }
}

// Assistant message complete
{
  type: 'assistant_message',
  data: {
    content: string;
    role: 'assistant';
    finishReason: 'stop' | 'length' | 'tool_calls';
  }
}

Tool Events

typescript
// Tool call initiated
{
  type: 'tool_call',
  data: {
    id: string;
    name: string;
    arguments: Record<string, any>;
    type: 'function';
  }
}

// Tool execution start
{
  type: 'tool_execution_start',
  data: {
    toolCallId: string;
    name: string;
  }
}

// Tool execution delta (streaming tools)
{
  type: 'tool_execution_delta',
  data: {
    toolCallId: string;
    delta: string;
    accumulated?: string;
  }
}

// Tool execution result
{
  type: 'tool_result',
  data: {
    toolCallId: string;
    name: string;
    content: string;
    success: boolean;
    error?: string;
  }
}

System Events

typescript
// Session started
{
  type: 'session_start',
  data: {
    sessionId: string;
    agentName: string;
    timestamp: number;
  }
}

// Session ended
{
  type: 'session_end',
  data: {
    sessionId: string;
    duration: number;
    messageCount: number;
  }
}

// Error occurred
{
  type: 'error',
  data: {
    error: string;
    code?: string;
    details?: any;
    recoverable: boolean;
  }
}

// Context compression
{
  type: 'context_compressed',
  data: {
    originalLength: number;
    compressedLength: number;
    compressionRatio: number;
    strategy: string;
  }
}

Event Stream Usage

Listening to Events

typescript
import { Agent } from '@tarko/agent';

const agent = new Agent({
  // ... configuration
});

// Listen to specific event types
agent.on('assistant_message', (event) => {
  console.log('Assistant response:', event.data.content);
});

agent.on('tool_call', (event) => {
  console.log('Tool called:', event.data.name, event.data.arguments);
});

agent.on('error', (event) => {
  console.error('Agent error:', event.data.error);
});

// Listen to all events
agent.on('*', (event) => {
  console.log('Event:', event.type, event.data);
});

Emitting Custom Events

typescript
// Emit custom events from tools or hooks
agent.emit({
  type: 'custom_metric',
  data: {
    metric: 'response_time',
    value: 1250,
    unit: 'ms'
  }
});

Event Filtering

typescript
// Filter events by type
const toolEvents = agent.getEventStream()
  .filter(event => event.type.startsWith('tool_'));

// Filter events by session
const sessionEvents = agent.getEventStream()
  .filter(event => event.sessionId === 'specific-session');

// Filter events by time range
const recentEvents = agent.getEventStream()
  .filter(event => event.timestamp > Date.now() - 3600000); // Last hour

Server Protocol

The Server Protocol defines HTTP/SSE/WebSocket APIs for external integration.

HTTP REST API

Session Management

http
# Create session
POST /api/v1/sessions/create
Content-Type: application/json

{
  "name": "My Session",
  "tags": ["tag1", "tag2"]
}

# Response
{
  "sessionId": "sess_123",
  "createdAt": 1622548800000
}
http
# Execute query
POST /api/v1/sessions/query
Content-Type: application/json

{
  "sessionId": "sess_123",
  "query": "Hello, how can you help?"
}

# Response
{
  "response": "Hello! I'm here to help...",
  "events": [
    {
      "type": "user_message",
      "data": { "content": "Hello, how can you help?" }
    },
    {
      "type": "assistant_message",
      "data": { "content": "Hello! I'm here to help..." }
    }
  ]
}

Event History

http
# Get session events
GET /api/v1/sessions/events?sessionId=sess_123&limit=50&offset=0

# Response
{
  "events": [
    {
      "id": "evt_1",
      "type": "user_message",
      "timestamp": 1622548800000,
      "sessionId": "sess_123",
      "data": { "content": "Hello" }
    }
  ],
  "total": 125,
  "hasMore": true
}

Server-Sent Events (SSE)

For real-time streaming:

http
# Streaming query
POST /api/v1/sessions/query/stream
Content-Type: application/json
Accept: text/event-stream

{
  "sessionId": "sess_123",
  "query": "Tell me a story"
}

SSE Response:

event: user_message
data: {"id":"evt_1","type":"user_message","data":{"content":"Tell me a story"}}

event: assistant_message_start
data: {"id":"evt_2","type":"assistant_message_start","data":{}}

event: assistant_message_delta
data: {"id":"evt_3","type":"assistant_message_delta","data":{"delta":"Once upon"}}

event: assistant_message_delta
data: {"id":"evt_4","type":"assistant_message_delta","data":{"delta":" a time"}}

event: assistant_message
data: {"id":"evt_5","type":"assistant_message","data":{"content":"Once upon a time..."}}

WebSocket Protocol

For bidirectional real-time communication:

javascript
const socket = new WebSocket('ws://localhost:8888/ws');

// Connect to session
socket.send(JSON.stringify({
  type: 'join_session',
  sessionId: 'sess_123'
}));

// Send query
socket.send(JSON.stringify({
  type: 'send_query',
  sessionId: 'sess_123',
  query: 'Hello!'
}));

// Receive events
socket.onmessage = (event) => {
  const agentEvent = JSON.parse(event.data);
  console.log('Received:', agentEvent.type, agentEvent.data);
};

// Abort query
socket.send(JSON.stringify({
  type: 'abort_query',
  sessionId: 'sess_123'
}));

Protocol Extensions

Custom Event Types

Define custom events for your application:

typescript
// Define custom event types
interface CustomEvents {
  'user_feedback': {
    rating: number;
    comment?: string;
  };
  'tool_performance': {
    toolName: string;
    executionTime: number;
    success: boolean;
  };
}

// Use with type safety
agent.on('user_feedback', (event) => {
  console.log('User rating:', event.data.rating);
});

agent.emit({
  type: 'tool_performance',
  data: {
    toolName: 'web_search',
    executionTime: 1250,
    success: true
  }
});

Protocol Versioning

typescript
// Specify protocol version
const agent = new Agent({
  protocol: {
    version: '1.0',
    extensions: ['custom-events', 'performance-metrics']
  }
});

// Version-aware event handling
agent.on('*', (event) => {
  const version = event.metadata?.version || '1.0';
  
  if (version === '1.0') {
    // Handle v1.0 events
  } else if (version === '2.0') {
    // Handle v2.0 events
  }
});

Integration Examples

React Integration

typescript
import { useEffect, useState } from 'react';

function AgentChat({ sessionId }: { sessionId: string }) {
  const [events, setEvents] = useState<AgentEvent[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  
  useEffect(() => {
    const eventSource = new EventSource(
      `/api/v1/sessions/events/stream?sessionId=${sessionId}`
    );
    
    eventSource.onmessage = (event) => {
      const agentEvent = JSON.parse(event.data);
      setEvents(prev => [...prev, agentEvent]);
      
      if (agentEvent.type === 'assistant_message') {
        setIsLoading(false);
      }
    };
    
    return () => eventSource.close();
  }, [sessionId]);
  
  const sendMessage = async (message: string) => {
    setIsLoading(true);
    
    await fetch('/api/v1/sessions/query', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ sessionId, query: message })
    });
  };
  
  return (
    <div>
      {events.map(event => (
        <EventDisplay key={event.id} event={event} />
      ))}
      {isLoading && <LoadingIndicator />}
      <MessageInput onSend={sendMessage} />
    </div>
  );
}

CLI Integration

typescript
import { Agent } from '@tarko/agent';
import chalk from 'chalk';

const agent = new Agent({ /* config */ });

// Display events in CLI
agent.on('user_message', (event) => {
  console.log(chalk.blue('User:'), event.data.content);
});

agent.on('assistant_message_delta', (event) => {
  process.stdout.write(chalk.green(event.data.delta));
});

agent.on('assistant_message', (event) => {
  console.log(); // New line after complete message
});

agent.on('tool_call', (event) => {
  console.log(chalk.yellow(`🔧 Using ${event.data.name}...`));
});

agent.on('tool_result', (event) => {
  if (event.data.success) {
    console.log(chalk.green('✅ Tool completed'));
  } else {
    console.log(chalk.red('❌ Tool failed:'), event.data.error);
  }
});

agent.on('error', (event) => {
  console.error(chalk.red('Error:'), event.data.error);
});

Monitoring Integration

typescript
import { Agent } from '@tarko/agent';
import { createPrometheusMetrics } from './metrics';

const metrics = createPrometheusMetrics();
const agent = new Agent({ /* config */ });

// Track metrics from events
agent.on('assistant_message', (event) => {
  metrics.responseCount.inc();
  metrics.responseLength.observe(event.data.content.length);
});

agent.on('tool_call', (event) => {
  metrics.toolCallCount.inc({ tool: event.data.name });
});

agent.on('tool_result', (event) => {
  const duration = Date.now() - event.timestamp;
  metrics.toolExecutionDuration.observe(
    { tool: event.data.name, success: event.data.success },
    duration
  );
});

agent.on('error', (event) => {
  metrics.errorCount.inc({ type: event.data.code || 'unknown' });
});

Best Practices

1. Event Design

  • Use consistent event naming conventions
  • Include all necessary data in event payload
  • Add metadata for debugging and tracing
  • Version your event schemas

2. Error Handling

  • Always include error context in error events
  • Use structured error codes
  • Provide actionable error messages
  • Implement proper error recovery

3. Performance

  • Batch events when possible
  • Use appropriate event filtering
  • Implement event compression for large payloads
  • Monitor event processing latency

4. Security

  • Sanitize event data before transmission
  • Implement proper authentication for event streams
  • Use encryption for sensitive event data
  • Audit event access patterns

Debugging and Monitoring

Event Inspection

typescript
// Enable debug logging
const agent = new Agent({
  debug: {
    events: true,
    level: 'verbose'
  }
});

// Export event stream for analysis
const events = agent.getEventHistory();
fs.writeFileSync('events.json', JSON.stringify(events, null, 2));

// Real-time event monitoring
agent.on('*', (event) => {
  if (process.env.NODE_ENV === 'development') {
    console.log(`[${event.type}]`, event.data);
  }
});

Protocol Validation

typescript
import { validateEvent, EventSchema } from '@tarko/agent';

// Validate events against schema
agent.on('*', (event) => {
  const validation = validateEvent(event, EventSchema);
  if (!validation.valid) {
    console.warn('Invalid event:', validation.errors);
  }
});

Next Steps