docs/api-reference/sdk-python.mdx
pip install iii-sdk
Create an III client and connect to the engine.
Blocks until the WebSocket connection is established and ready.
from iii import register_worker, InitOptions
iii = register_worker('ws://localhost:49134', InitOptions(worker_name='my-worker'))
Connect to the III Engine via WebSocket.
Initializes OpenTelemetry (if configured), attaches the event loop, and establishes the WebSocket connection. This is called automatically during construction -- use it only if you need to reconnect manually from an async context.
Signature
async ()
Create a streaming channel pair for worker-to-worker data transfer.
The returned Channel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
create_channel(buffer_size: int | None = None)
| Name | Type | Required | Description |
|---|---|---|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
ch = iii.create_channel()
fn = iii.register_function({"id": "producer"}, producer_handler)
iii.trigger({"function_id": "producer", "payload": {"output": ch.writer_ref}})
Create a streaming channel pair for worker-to-worker data transfer.
The returned Channel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
async (buffer_size: int | None = None)
| Name | Type | Required | Description |
|---|---|---|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
ch = await iii.create_channel_async()
fn = iii.register_function({"id": "producer"}, producer_handler)
await iii.trigger_async({"function_id": "producer", "payload": {"output": ch.writer_ref}})
Register a custom stream implementation, overriding the engine default.
Registers 5 of the 6 IStream methods (get, set, delete,
list, list_groups). The update method is not registered
-- atomic updates are handled by the engine's built-in stream update logic.
Signature
create_stream(stream_name: str, stream: IStream[Any])
| Name | Type | Required | Description |
|---|---|---|---|
stream_name | str | Yes | Unique name for the stream. |
stream | IStream[Any] | Yes | An object implementing the IStream interface. |
from iii.stream import IStream
class MyStream(IStream):
async def get(self, input): ...
async def set(self, input): ...
async def delete(self, input): ...
async def list(self, input): ...
async def list_groups(self, input): ...
async def update(self, input): ...
iii.create_stream("my-stream", MyStream())
Return the current WebSocket connection state.
Signature
get_connection_state()
List all functions registered with the engine across all workers.
Signature
list_functions()
for fn in iii.list_functions():
print(fn.function_id, fn.description)
List all functions registered with the engine across all workers.
Signature
async ()
for fn in await iii.list_functions_async():
print(fn.function_id, fn.description)
List all triggers registered with the engine.
Signature
list_triggers(include_internal: bool = False)
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal triggers (e.g. functions-available). Defaults to False. |
triggers = iii.list_triggers()
internal = iii.list_triggers(include_internal=True)
List all triggers registered with the engine.
Signature
async (include_internal: bool = False)
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal triggers (e.g. functions-available). Defaults to False. |
triggers = await iii.list_triggers_async()
internal = await iii.list_triggers_async(include_internal=True)
List all workers currently connected to the engine.
Signature
list_workers()
for w in iii.list_workers():
print(w.name, w.worker_id)
List all workers currently connected to the engine.
Signature
async ()
for w in await iii.list_workers_async():
print(w.name, w.worker_id)
Subscribe to function-availability events from the engine.
The callback fires whenever the set of available functions changes (e.g. a new worker connects or a function is unregistered).
Signature
on_functions_available(callback: Callable[None])
| Name | Type | Required | Description |
|---|---|---|---|
callback | Callable[None] | Yes | - |
def on_change(functions):
print("Available:", [f.function_id for f in functions])
unsub = iii.on_functions_available(on_change)
# later ...
unsub()
Register a function with the engine.
Pass a handler for local execution, or an HttpInvocationConfig
for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).
Handlers can be synchronous or asynchronous. Sync handlers are
automatically wrapped with run_in_executor so they do not
block the event loop. Each handler receives a single data
argument containing the trigger payload.
Signature
register_function(func: RegisterFunctionInput | dict[str, Any], handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig)
| Name | Type | Required | Description |
|---|---|---|---|
func | RegisterFunctionInput | dict[str, Any] | Yes | A RegisterFunctionInput or dict with id and optional description, metadata, request_format, response_format. |
handler_or_invocation | RemoteFunctionHandler | HttpInvocationConfig | Yes | A callable handler or HttpInvocationConfig. Callable handlers receive one positional argument (data -- the trigger payload) and may return a value. |
def greet(data):
return {'message': f"Hello, {data['name']}!"}
fn = iii.register_function({"id": "greet", "description": "Greets a user"}, greet)
fn.unregister()
Register a logical service grouping with the engine.
Services provide an organisational hierarchy for functions. A
service can optionally reference a parent_service_id to form
a tree visible in the engine dashboard.
Signature
register_service(service: RegisterServiceInput | dict[str, Any])
| Name | Type | Required | Description |
|---|---|---|---|
service | RegisterServiceInput | dict[str, Any] | Yes | A RegisterServiceInput or dict with id and optional name, description, parent_service_id. |
iii.register_service({"id": "payments", "description": "Payment processing"})
iii.register_service({
"id": "payments::refunds",
"description": "Refund sub-service",
"parent_service_id": "payments",
})
Bind a trigger configuration to a registered function.
Signature
register_trigger(trigger: RegisterTriggerInput | dict[str, Any])
| Name | Type | Required | Description |
|---|---|---|---|
trigger | RegisterTriggerInput | dict[str, Any] | Yes | A RegisterTriggerInput or dict with type, function_id, and optional config. |
trigger = iii.register_trigger({
'type': 'http',
'function_id': 'greet',
'config': {'api_path': '/greet', 'http_method': 'GET'}
})
trigger = iii.register_trigger(RegisterTriggerInput(
type="http", function_id="greet",
config={'api_path': '/greet', 'http_method': 'GET'}
))
trigger.unregister()
Register a custom trigger type with the engine.
Signature
register_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any], handler: TriggerHandler[Any])
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and description. |
handler | TriggerHandler[Any] | Yes | A TriggerHandler instance. Must implement both register_trigger(config) and unregister_trigger(trigger) async methods. register_trigger is called when a trigger of this type is bound to a function, and unregister_trigger is called when the binding is removed. |
iii.register_trigger_type({"id": "webhook", "description": "Webhook trigger"}, handler)
iii.register_trigger_type(
RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"), handler
)
Gracefully shut down the client, releasing all resources.
Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused.
Signature
shutdown()
iii = register_worker('ws://localhost:49134')
# ... do work ...
iii.shutdown()
Gracefully shut down the client, releasing all resources.
Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused.
Signature
async ()
iii = register_worker('ws://localhost:49134')
# ... do work ...
await iii.shutdown_async()
Invoke a remote function.
The routing behavior and return type depend on the action field:
TriggerAction.Enqueue(...): async via named queue -- returns EnqueueResult.TriggerAction.Void(): fire-and-forget -- returns None.Signature
trigger(request: dict[str, Any] | TriggerRequest)
| Name | Type | Required | Description |
|---|---|---|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
result = iii.trigger({'function_id': 'greet', 'payload': {'name': 'World'}})
iii.trigger({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})
Invoke a remote function.
The routing behavior and return type depend on the action field:
TriggerAction.Enqueue(...): async via named queue -- returns EnqueueResult.TriggerAction.Void(): fire-and-forget -- returns None.Signature
async (request: dict[str, Any] | TriggerRequest)
| Name | Type | Required | Description |
|---|---|---|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
result = await iii.trigger_async({'function_id': 'greet', 'payload': {'name': 'World'}})
await iii.trigger_async({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})
Unregister a previously registered trigger type.
Signature
unregister_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any])
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and optional description. |
iii.unregister_trigger_type({"id": "webhook", "description": "Webhook trigger"})
iii.unregister_trigger_type(RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"))
Structured logger that emits logs as OpenTelemetry LogRecords.
Every log call automatically captures the active trace and span context,
correlating your logs with distributed traces without any manual wiring.
When OTel is not initialized, Logger gracefully falls back to Python
logging.
Pass structured data as the second argument to any log method. Using a dict of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.
Log a debug-level message.
Signature
debug(message: str, data: Any = None)
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
logger.debug('Cache lookup', {'key': 'user:42', 'hit': False})
Log an error-level message.
Signature
error(message: str, data: Any = None)
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
logger.error('Payment failed', {
'order_id': 'ord_123',
'gateway': 'stripe',
'error_code': 'card_declined',
})
Log an info-level message.
Signature
info(message: str, data: Any = None)
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
logger.info('Order processed', {'order_id': 'ord_123', 'status': 'completed'})
Log a warning-level message.
Signature
warn(message: str, data: Any = None)
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'})
InitOptions · ReconnectionConfig · TelemetryOptions · HttpInvocationConfig · RegisterFunctionFormat · RegisterFunctionInput · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · OtelConfig · TriggerHandler
Options for configuring the III SDK.
| Name | Type | Required | Description |
|---|---|---|---|
enable_metrics_reporting | bool | No | Enable worker metrics via OpenTelemetry. Default True. |
invocation_timeout_ms | int | No | Default timeout for trigger() in milliseconds. Default 30000. |
otel | OtelConfig | dict[str, Any] | None | No | OpenTelemetry configuration. Enabled by default. Set \{'enabled': False\} or env OTEL_ENABLED=false to disable. |
reconnection_config | ReconnectionConfig | None | No | WebSocket reconnection behavior. |
telemetry | TelemetryOptions | None | No | Internal telemetry metadata. |
worker_name | str | None | No | Display name for this worker. Defaults to hostname:pid. |
Configuration for WebSocket reconnection behavior.
| Name | Type | Required | Description |
|---|---|---|---|
backoff_multiplier | float | No | Exponential backoff multiplier. Default 2.0. |
initial_delay_ms | int | No | Starting delay in milliseconds. Default 1000. |
jitter_factor | float | No | Random jitter factor (0--1). Default 0.3. |
max_delay_ms | int | No | Maximum delay cap in milliseconds. Default 30000. |
max_retries | int | No | Maximum retry attempts. -1 for infinite. Default -1. |
Telemetry metadata to be reported to the engine.
| Name | Type | Required | Description |
|---|---|---|---|
amplitude_api_key | str | None | No | Amplitude API key for product analytics. |
framework | str | None | No | Framework name (e.g. motia) if applicable. |
language | str | None | No | Programming language of the worker (e.g. python). |
project_name | str | None | No | Name of the project this worker belongs to. |
Config for HTTP external function invocation.
| Name | Type | Required | Description |
|---|---|---|---|
auth | HttpAuthConfig | None | No | Authentication configuration (bearer, HMAC, or API key). |
headers | dict[str, str] | None | No | Additional HTTP headers to include in the request. |
method | Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] | No | HTTP method. Defaults to 'POST'. |
timeout_ms | int | None | No | Request timeout in milliseconds. |
url | str | No | Target URL for the HTTP invocation. |
Format definition for function parameters.
| Name | Type | Required | Description |
|---|---|---|---|
body | list[RegisterFunctionFormat] | None | No | Nested fields for object types. |
description | str | None | No | Human-readable description of the parameter. |
items | RegisterFunctionFormat | None | No | Item schema for array types. |
name | str | Yes | Parameter name. |
required | bool | No | Whether the parameter is required. |
type | str | Yes | Type string (string, number, boolean, object, array, null, map). |
Input for registering a function — matches Node.js RegisterFunctionInput.
| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | Human-readable description. |
id | str | No | Unique function identifier. |
invocation | HttpInvocationConfig | None | No | HTTP invocation config for externally hosted functions. |
metadata | dict[str, Any] | None | No | Arbitrary metadata attached to the function. |
request_format | RegisterFunctionFormat | None | No | Schema describing expected input. |
response_format | RegisterFunctionFormat | None | No | Schema describing expected output. |
Input for registering a service (matches Node SDK's RegisterServiceInput).
| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | Description of the service. |
id | str | No | Unique service identifier. |
name | str | None | No | Human-readable service name. |
parent_service_id | str | None | No | ID of the parent service for hierarchical grouping. |
Input for registering a trigger (matches Node SDK's RegisterTriggerInput).
| Name | Type | Required | Description |
|---|---|---|---|
config | Any | No | Trigger-type-specific configuration. |
function_id | str | No | ID of the function this trigger invokes. |
type | str | No | Trigger type identifier (e.g. http, queue, cron). |
Input for registering a trigger type (matches Node SDK's RegisterTriggerTypeInput).
| Name | Type | Required | Description |
|---|---|---|---|
description | str | No | Human-readable description of the trigger type. |
id | str | No | Unique identifier for the trigger type. |
Routes the invocation through a named queue for async processing.
| Name | Type | Required | Description |
|---|---|---|---|
queue | str | Yes | Name of the target queue. |
type | Literal['enqueue'] | No | Always 'enqueue'. |
Fire-and-forget routing. No response is returned.
| Name | Type | Required | Description |
|---|---|---|---|
type | Literal['void'] | No | Always 'void'. |
Request object for trigger().
| Name | Type | Required | Description |
|---|---|---|---|
action | TriggerActionEnqueue | TriggerActionVoid | None | No | Routing action — None for sync, TriggerAction.Enqueue(...) for queue, TriggerAction.Void() for fire-and-forget. |
function_id | str | No | ID of the function to invoke. |
payload | Any | No | Data to pass to the function. |
timeout_ms | int | None | No | Override the default invocation timeout. |
Abstract interface for stream operations.
Configuration for OpenTelemetry initialization.
| Name | Type | Required | Description |
|---|---|---|---|
enabled | bool | None | No | Enable OTel. Defaults to True. Set OTEL_ENABLED=false/0/no/off to disable. |
engine_ws_url | str | None | No | III Engine WebSocket URL. Defaults to env III_URL or 'ws://localhost:49134'. |
fetch_instrumentation_enabled | bool | No | Auto-instrument urllib HTTP calls via URLLibInstrumentor. Defaults to True. |
logs_batch_size | int | None | No | Maximum number of log records exported per batch. Defaults to 1 when not set. |
logs_enabled | bool | None | No | Enable OTel log export via EngineLogExporter. Defaults to True when OTel is enabled. |
logs_flush_interval_ms | int | None | No | Log processor flush delay in milliseconds. Defaults to 100ms when not set. |
metrics_enabled | bool | No | Enable OTel metrics export via EngineMetricsExporter. Defaults to True. |
metrics_export_interval_ms | int | No | Metrics export interval in milliseconds. Defaults to 60000 (60 seconds). |
service_instance_id | str | None | No | Service instance ID. Defaults to a random UUID. |
service_name | str | None | No | Service name. Defaults to env OTEL_SERVICE_NAME or 'iii-python-sdk'. |
service_namespace | str | None | No | Service namespace attribute. |
service_version | str | None | No | Service version. Defaults to env SERVICE_VERSION or 'unknown'. |
Abstract base class for trigger handlers.