docs/examples/observability.mdx
Every iii function invocation receives a traceId (via new Logger()) that is automatically propagated to downstream queue handlers. All logs emitted through the context logger are structured JSON and correlated to the active trace.
graph LR
Client -->|"POST /orders"| CreateOrder
CreateOrder -->|"enqueue + traceId"| ProcessOrder
ProcessOrder -->|"enqueue + traceId"| NotifyOrder
CreateOrder & ProcessOrder & NotifyOrder -->|"structured logs"| Observability
import {registerWorker, Logger, TriggerAction} from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134', {
otel: {
enabled: true,
serviceName: 'orders-service',
metricsEnabled: true,
},
})
// Step 1 — HTTP handler
iii.registerFunction(
{ id: 'orders.create', description: 'Creates an order and starts the processing pipeline' },
async (req: ApiRequest<{ customerId: string; amount: number; items: string[] }>) => {
const logger = new Logger()
const { customerId, amount, items } = req.body ?? {}
if (!customerId || !amount) {
return { status_code: 400, body: { error: 'customerId and amount are required' } }
}
const orderId = `order-${Date.now()}`
logger.info('Order created', {
orderId,
customerId,
amount,
traceId: currentTraceId(), // same traceId will appear in downstream logs
})
await iii.trigger({
function_id: 'state::set',
payload: {
scope: 'orders',
key: orderId,
value: { id: orderId, customerId, amount, items, status: 'created', createdAt: new Date().toISOString() },
},
action: TriggerAction.Void(),
})
await iii.trigger({
function_id: 'enqueue',
payload: { topic: 'order.process', data: { orderId, amount, customerId, items } },
action: TriggerAction.Void(),
})
return { status_code: 201, body: { orderId, status: 'processing' } }
},
)
iii.registerTrigger({
type: 'http',
function_id: 'orders.create',
config: { api_path: 'orders', http_method: 'POST' },
})
from datetime import datetime, timezone
from iii import register_worker, InitOptions, ApiRequest, ApiResponse, Logger, TriggerAction, current_trace_id
iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="orders-service"))
def create_order(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
body = req.body or {}
customer_id = body.get("customerId")
amount = body.get("amount")
if not customer_id or not amount:
return ApiResponse(status_code=400, body={"error": "customerId and amount are required"})
order_id = f"order-{int(__import__('time').time() * 1000)}"
logger.info("Order created", {
"orderId": order_id,
"customerId": customer_id,
"amount": amount,
"traceId": current_trace_id(), # same traceId will appear in downstream logs
})
iii.trigger({
"function_id": "state::set",
"payload": {
"scope": "orders",
"key": order_id,
"value": {
"id": order_id,
"customerId": customer_id,
"amount": amount,
"status": "created",
"createdAt": datetime.now(timezone.utc).isoformat(),
},
},
"action": TriggerAction.Void(),
})
iii.trigger({
"function_id": "enqueue",
"payload": {"topic": "order.process", "data": {"orderId": order_id, "amount": amount, "customerId": customer_id}},
"action": TriggerAction.Void(),
})
return ApiResponse(status_code=201, body={"orderId": order_id, "status": "processing"})
iii.register_function({"id": "orders.create"}, create_order)
iii.register_trigger({
"type": "http", "function_id": "orders.create",
"config": {"api_path": "orders", "http_method": "POST"},
})
use iii_sdk::{register_worker, InitOptions, Logger, TriggerRequest, TriggerAction, types::ApiRequest, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
let iii = register_worker("ws://localhost:49134", InitOptions::default());
iii.register_function(RegisterFunctionMessage { id: "orders.create".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
let req: ApiRequest = serde_json::from_value(input)?;
let customer_id = req.body["customerId"].as_str().unwrap_or("").to_string();
let amount = req.body["amount"].as_f64().unwrap_or(0.0);
if customer_id.is_empty() || amount == 0.0 {
return Ok(json!({
"status_code": 400,
"body": { "error": "customerId and amount are required" },
}));
}
let order_id = format!("order-{}", chrono::Utc::now().timestamp_millis());
logger.info("Order created", Some(json!({
"orderId": order_id,
"customerId": customer_id,
"amount": amount,
"traceId": current_trace_id(), // same traceId propagated to downstream handlers
})));
iii.trigger(TriggerRequest::new("state::set", json!({
"scope": "orders",
"key": order_id,
"value": {
"id": order_id,
"customerId": customer_id,
"amount": amount,
"status": "created",
"createdAt": chrono::Utc::now().to_rfc3339(),
},
})).action(TriggerAction::void())).await?;
iii.trigger(TriggerRequest::new("enqueue", json!({
"topic": "order.process",
"data": { "orderId": order_id, "amount": amount, "customerId": customer_id },
})).action(TriggerAction::void())).await?;
Ok(json!({ "status_code": 201, "body": { "orderId": order_id, "status": "processing" } }))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "http".into(), function_id: "orders.create".into(), config: json!({
"api_path": "orders",
"http_method": "POST",
}) })?;
iii.registerFunction(
{ id: 'order.process', description: 'Processes a created order' },
async (data: { orderId: string; amount: number; customerId: string }) => {
const logger = new Logger()
const { orderId, amount, customerId } = data
logger.info('Processing order', { orderId, amount, customerId, traceId: currentTraceId() })
await iii.trigger({
function_id: 'state::set',
payload: {
scope: 'orders',
key: orderId,
value: { status: 'processed', processedAt: new Date().toISOString() },
},
action: TriggerAction.Void(),
})
await iii.trigger({
function_id: 'enqueue',
payload: { topic: 'order.notify', data: { orderId, customerId, amount, status: 'processed' } },
action: TriggerAction.Void(),
})
logger.info('Order processed', { orderId, traceId: currentTraceId() })
},
)
iii.registerTrigger({
type: 'queue',
function_id: 'order.process',
config: { topic: 'order.process' },
})
from iii import Logger, current_trace_id
def process_order(data: dict) -> None:
logger = Logger()
order_id = data.get("orderId")
amount = data.get("amount")
customer_id = data.get("customerId")
logger.info("Processing order", {
"orderId": order_id,
"amount": amount,
"traceId": current_trace_id(),
})
iii.trigger({
"function_id": "state::set",
"payload": {
"scope": "orders",
"key": order_id,
"value": {
"status": "processed",
"processedAt": datetime.now(timezone.utc).isoformat(),
},
},
"action": TriggerAction.Void(),
})
iii.trigger({
"function_id": "enqueue",
"payload": {"topic": "order.notify", "data": {"orderId": order_id, "customerId": customer_id, "amount": amount, "status": "processed"}},
"action": TriggerAction.Void(),
})
logger.info("Order processed", {"orderId": order_id, "traceId": current_trace_id()})
iii.register_function({"id": "order.process"}, process_order)
iii.register_trigger({"type": "queue", "function_id": "order.process", "config": {"topic": "order.process"}})
use iii_sdk::{Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "order.process".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
let order_id = input["orderId"].as_str().unwrap_or("unknown");
logger.info("Processing order", Some(json!({
"orderId": order_id,
"traceId": current_trace_id(),
})));
iii.trigger(TriggerRequest::new("enqueue", json!({
"topic": "order.notify",
"data": { "orderId": order_id, "customerId": input["customerId"] },
})).action(TriggerAction::void())).await?;
logger.info("Order processed", Some(json!({ "orderId": order_id })));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "queue".into(), function_id: "order.process".into(), config: json!({ "topic": "order.process" }) })?;
iii.registerFunction(
{ id: 'order.notify', description: 'Sends order notification' },
async (data: { orderId: string; customerId: string; status: string; amount: number }) => {
const logger = new Logger()
const { orderId, customerId, status, amount } = data
logger.info('Sending order notification', {
orderId,
customerId,
status,
amount,
traceId: currentTraceId(),
})
// integrate with your notification service here
logger.info('Order notification sent', { orderId, traceId: currentTraceId() })
},
)
iii.registerTrigger({
type: 'queue',
function_id: 'order.notify',
config: { topic: 'order.notify' },
})
from iii import Logger, current_trace_id
def notify_order(data: dict) -> None:
logger = Logger()
order_id = data.get("orderId")
customer_id = data.get("customerId")
logger.info("Sending order notification", {
"orderId": order_id,
"customerId": customer_id,
"traceId": current_trace_id(),
})
# integrate with your notification service here
logger.info("Order notification sent", {"orderId": order_id, "traceId": current_trace_id()})
iii.register_function({"id": "order.notify"}, notify_order)
iii.register_trigger({"type": "queue", "function_id": "order.notify", "config": {"topic": "order.notify"}})
use iii_sdk::{Logger, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "order.notify".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
let order_id = input["orderId"].as_str().unwrap_or("unknown");
logger.info("Sending order notification", Some(json!({
"orderId": order_id,
"traceId": current_trace_id(),
})));
// integrate with your notification service here
logger.info("Order notification sent", Some(json!({ "orderId": order_id })));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "queue".into(), function_id: "order.notify".into(), config: json!({ "topic": "order.notify" }) })?;
// Pass otel config to registerWorker()
const iii = registerWorker('ws://localhost:49134', {
otel: {
enabled: true,
serviceName: 'my-service',
serviceVersion: '1.0.0',
metricsEnabled: true,
metricsExportIntervalMs: 10000,
reconnectionConfig: {
maxRetries: 10,
},
},
})
The iii SDK exports traces and metrics automatically via the engine's OpenTelemetry pipeline (OTLP over the WebSocket). No separate exporter configuration is required.
</Tab> <Tab title="Python"># OTel is handled by the engine — just pass the worker name in InitOptions
from iii import register_worker, InitOptions, Logger, current_trace_id
iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="my-service"))
# Traces are automatically correlated via current_trace_id()
# Access trace_id in any handler:
logger = Logger()
print(current_trace_id())
# Cargo.toml — enable the telemetry feature
[dependencies]
iii-sdk = { version = "0.1", features = ["telemetry"] }
use iii_sdk::{register_worker, InitOptions, Logger};
use serde_json::json;
// Initialize OTLP before connecting
iii_sdk::telemetry::init_otlp("my-service").await?;
let iii = register_worker("ws://127.0.0.1:49134", InitOptions::default());
// current_trace_id() is available in every handler
let logger = Logger();
logger.info("My message", Some(json!({ "traceId": current_trace_id() })));
| Method | TypeScript | Python | Rust |
|---|---|---|---|
| Info | logger.info(msg, metadata?) | logger.info(msg, fields?) | logger.info(msg, Some(json!({...}))) |
| Warning | logger.warn(msg, metadata?) | logger.warn(msg, fields?) | logger.warn(msg, None) |
| Error | logger.error(msg, metadata?) | logger.error(msg, fields?) | logger.error(msg, None) |
| Debug | logger.debug(msg, metadata?) | logger.debug(msg, fields?) | logger.debug(msg, None) |
currentTraceId() / current_trace_id() is the W3C trace-context ID for the current invocation. It is automatically propagated to all downstream workers when you emit an event — no manual header passing is needed.log.info / log.warn / log.error functions.registerWorker(). Python and Rust use the engine's built-in OTLP pipeline.traceId to make multi-step flows fully observable.