docs/advanced/protocol.mdx
The iii Engine uses a WebSocket-based JSON protocol for communication between the engine and workers.
The protocol defines specific message types for different operations:
| Message Type | Direction | Description |
|---|---|---|
registerfunction | Worker → Engine | Declares a function available for execution |
registertrigger | Worker → Engine | Configures a trigger (API route, event, cron) |
invokefunction | Bidirectional | Requests execution of a specific function |
invocationresult | Bidirectional | Returns the result of a function execution |
ping / pong | Bidirectional | Keep-alive mechanism |
When a worker connects, it must register its capabilities:
sequenceDiagram
participant W as Worker
participant E as Engine
Note over W,E: WebSocket Connection
Established
W->>E: registerfunction { id: "api::echo" }
W->>E: registertrigger { trigger_type: "http", function_id: "api::echo", ... }
Note over E: Engine updates
internal registry
E-->>W: workerregistered { worker_id }
Functions can be invoked synchronously (with response) or asynchronously (fire-and-forget):
sequenceDiagram
participant Client
participant Engine
participant Worker
Client->>+Engine: invokefunction { invocation_id: "123", ... }
Engine->>+Worker: invokefunction { invocation_id: "123", ... }
Note right of Worker: Execute Logic
Worker-->>-Engine: invocationresult { invocation_id: "123", result: {...} }
Engine-->>-Client: invocationresult { invocation_id: "123", result: {...} }
Register a triggerable function with the engine:
{
type: 'registerfunction',
id: 'service::function', // e.g., "api::echo"
description?: 'Function description',
request_format?: { ... }, // JSON schema for input
response_format?: { ... }, // JSON schema for output
metadata?: { ... }
}
Configure a trigger that maps to a function:
{
type: 'registertrigger',
id: 'unique-trigger-id',
trigger_type: 'http' | 'queue' | 'cron' | 'log',
function_id: 'service::function',
config: {
// Trigger-specific configuration
// For 'http': { api_path: '/path', http_method: 'GET' }
// For 'queue': { topic: 'topic.name' }
// For 'cron': { expression: '0 * * * *' }
// For 'log': { level: 'error' }
}
}
Request execution of a registered function:
{
type: 'invokefunction',
invocation_id?: 'unique-id', // Optional, required for sync calls
function_id: 'service::function',
data: { ... } // Function input
}
Return the result of a function execution:
{
type: 'invocationresult',
invocation_id: 'unique-id',
function_id: 'service::function',
result?: { ... }, // Success result
error?: { // Error result
message: 'Error description',
code: 'ERROR_CODE'
}
}
The Streams module uses a specialized sub-protocol for real-time state synchronization.
// Subscribe to a stream
{
type: 'Join',
subscription_id: 'sub-123',
stream_name: 'todos',
group_id: 'user-456',
id: 'item-789'
}
// Unsubscribe from a stream
{
type: 'Leave',
subscription_id: 'sub-123'
}
{
timestamp: 1234567890,
stream_name: 'todos',
group_id: 'user-456',
event: {
type: 'Sync' | 'Create' | 'Update' | 'Delete' | 'Event',
data: { ... }
}
}