docs/api-reference/sdk-node.mdx
npm install iii-sdk
Creates and returns a connected SDK instance. The WebSocket connection is
established automatically -- there is no separate connect() call.
import { registerWorker } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134', {
workerName: 'my-worker',
})
Creates a streaming channel pair for worker-to-worker data transfer. Returns a Channel with a local writer/reader and serializable refs that can be passed as fields in the invocation data to other functions.
Signature
createChannel(bufferSize: number) => Promise<Channel>
| Name | Type | Required | Description |
|---|---|---|---|
bufferSize | number | Yes | Optional buffer size for the channel (default: 64) |
const channel = await iii.createChannel()
// Pass the writer ref to another function
await iii.trigger({
function_id: 'stream-producer',
payload: { outputChannel: channel.writerRef },
})
// Read data locally
channel.reader.onMessage((msg) => {
console.log('Received:', msg)
})
Creates a new stream implementation.
This overrides the default stream implementation.
Signature
createStream(streamName: string, stream: IStream<TData>) => void
| Name | Type | Required | Description |
|---|---|---|---|
streamName | string | Yes | The name of the stream |
stream | IStream<TData> | Yes | The stream implementation |
const redisStream: IStream<UserSession> = {
async get({ group_id, item_id }) {
return JSON.parse(await redis.get(`${group_id}:${item_id}`) ?? 'null')
},
async set({ group_id, item_id, data }) {
const old = await this.get({ stream_name: 'sessions', group_id, item_id })
await redis.set(`${group_id}:${item_id}`, JSON.stringify(data))
return { old_value: old ?? undefined, new_value: data }
},
async delete({ group_id, item_id }) {
const old = await this.get({ stream_name: 'sessions', group_id, item_id })
await redis.del(`${group_id}:${item_id}`)
return { old_value: old ?? undefined }
},
async list({ group_id }) { return [] },
async listGroups() { return [] },
async update({ group_id, item_id, ops }) { return { new_value: {} } },
}
iii.createStream('sessions', redisStream)
Lists all registered functions.
Signature
listFunctions() => Promise<FunctionInfo[]>
const functions = await iii.listFunctions()
for (const fn of functions) {
console.log(`${fn.function_id}: ${fn.description}`)
}
Lists all registered triggers.
Signature
listTriggers(includeInternal: boolean) => Promise<TriggerInfo[]>
| Name | Type | Required | Description |
|---|---|---|---|
includeInternal | boolean | Yes | Whether to include internal triggers (default: false) |
Registers a callback to receive the current functions list when the engine announces changes.
Signature
onFunctionsAvailable(callback: FunctionsAvailableCallback) => () => void
| Name | Type | Required | Description |
|---|---|---|---|
callback | FunctionsAvailableCallback | Yes | - |
const unsubscribe = iii.onFunctionsAvailable((functions) => {
console.log(`${functions.length} functions available:`)
for (const fn of functions) {
console.log(` - ${fn.function_id}`)
}
})
// Later, stop listening
unsubscribe()
Signature
registerFunction(func: RegisterFunctionInput, handler: RemoteFunctionHandler) => FunctionRef
| Name | Type | Required | Description |
|---|---|---|---|
func | RegisterFunctionInput | Yes | - |
handler | RemoteFunctionHandler | Yes | - |
Registers a new service.
Signature
registerService(message: RegisterServiceInput) => void
| Name | Type | Required | Description |
|---|---|---|---|
message | RegisterServiceInput | Yes | The service to register |
Registers a new trigger. A trigger is a way to invoke a function when a certain event occurs.
Signature
registerTrigger(trigger: RegisterTriggerInput) => Trigger
| Name | Type | Required | Description |
|---|---|---|---|
trigger | RegisterTriggerInput | Yes | The trigger to register |
const trigger = iii.registerTrigger({
type: 'cron',
function_id: 'my-service::process-batch',
config: { schedule: '*/5 * * * *' },
})
// Later, remove the trigger
trigger.unregister()
Registers a new trigger type. A trigger type is a way to invoke a function when a certain event occurs.
Signature
registerTriggerType(triggerType: RegisterTriggerTypeInput, handler: TriggerHandler<TConfig>) => void
| Name | Type | Required | Description |
|---|---|---|---|
triggerType | RegisterTriggerTypeInput | Yes | The trigger type to register |
handler | TriggerHandler<TConfig> | Yes | The handler for the trigger type |
type CronConfig = { schedule: string }
iii.registerTriggerType<CronConfig>(
{ id: 'cron', description: 'Fires on a cron schedule' },
{
async registerTrigger({ id, function_id, config }) {
startCronJob(id, config.schedule, () =>
iii.trigger({ function_id, payload: {} }),
)
},
async unregisterTrigger({ id }) {
stopCronJob(id)
},
},
)
Gracefully shutdown the iii, cleaning up all resources.
Signature
shutdown() => Promise<void>
process.on('SIGTERM', async () => {
await iii.shutdown()
process.exit(0)
})
Invokes a function using a request object.
Signature
trigger(request: TriggerRequest<TInput>) => Promise<TOutput>
| Name | Type | Required | Description |
|---|---|---|---|
request | TriggerRequest<TInput> | Yes | The trigger request containing function_id, payload, and optional action/timeout |
// Synchronous invocation
const result = await iii.trigger<{ name: string }, { message: string }>({
function_id: 'greet',
payload: { name: 'World' },
timeoutMs: 5000,
})
console.log(result.message) // "Hello, World!"
// Fire-and-forget
await iii.trigger({
function_id: 'send-email',
payload: { to: '[email protected]' },
action: TriggerAction.Void(),
})
// Enqueue for async processing
const receipt = await iii.trigger({
function_id: 'process-order',
payload: { orderId: '123' },
action: TriggerAction.Enqueue({ queue: 'orders' }),
})
Unregisters a trigger type.
Signature
unregisterTriggerType(triggerType: RegisterTriggerTypeInput) => void
| Name | Type | Required | Description |
|---|---|---|---|
triggerType | RegisterTriggerTypeInput | Yes | The trigger type to unregister |
iii.unregisterTriggerType({ id: 'cron', description: 'Fires on a cron schedule' })
The iii-sdk package provides additional entry points:
| Import path | Contents |
|---|---|
iii-sdk | MessageType, ChannelReader, ChannelWriter, Logger, ISdk, ApiRequest, ApiResponse, Channel, EnqueueResult, FunctionRef, etc. |
iii-sdk/stream | IStream, StreamAuthInput, StreamAuthResult, StreamJoinLeaveEvent, StreamJoinResult, DeleteResult, StreamContext, StreamDeleteInput, StreamGetInput, StreamListGroupsInput, etc. |
iii-sdk/state | StateEventType, IState, StateEventData, DeleteResult, StateDeleteInput, StateDeleteResult, StateGetInput, StateListInput, StateSetInput, StateSetResult, etc. |
iii-sdk/telemetry | WorkerMetricsCollector, IIIReconnectionConfig, OtelConfig, ReconnectionConfig, WorkerGaugesOptions, WorkerMetricsCollectorOptions, FunctionInfo, FunctionsAvailableCallback, IIIConnectionState, OtelLogEvent, etc. |
Structured logger that emits logs as OpenTelemetry LogRecords.
Every log call automatically captures the active trace and span context,
correlating your logs with distributed traces without any manual wiring.
When OTel is not initialized, Logger gracefully falls back to console.*.
Pass structured data as the second argument to any log method. Using an object of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.
Log a debug-level message.
Signature
debug(message: string, data: unknown) => void
| Name | Type | Required | Description |
|---|---|---|---|
message | string | Yes | Human-readable log message. |
data | unknown | Yes | Structured context attached as OTel log attributes. |
| Use key-value objects to enable filtering and aggregation in your | |||
| observability backend (e.g. Grafana, Datadog, New Relic). |
logger.debug('Cache lookup', { key: 'user:42', hit: false })
Log an error-level message.
Signature
error(message: string, data: unknown) => void
| Name | Type | Required | Description |
|---|---|---|---|
message | string | Yes | Human-readable log message. |
data | unknown | Yes | Structured context attached as OTel log attributes. |
| Use key-value objects to enable filtering and aggregation in your | |||
| observability backend (e.g. Grafana, Datadog, New Relic). |
logger.error('Payment failed', { orderId: 'ord_123', gateway: 'stripe', errorCode: 'card_declined' })
Log an info-level message.
Signature
info(message: string, data: unknown) => void
| Name | Type | Required | Description |
|---|---|---|---|
message | string | Yes | Human-readable log message. |
data | unknown | Yes | Structured context attached as OTel log attributes. |
| Use key-value objects to enable filtering and aggregation in your | |||
| observability backend (e.g. Grafana, Datadog, New Relic). |
logger.info('Order processed', { orderId: 'ord_123', status: 'completed' })
Log a warning-level message.
Signature
warn(message: string, data: unknown) => void
| Name | Type | Required | Description |
|---|---|---|---|
message | string | Yes | Human-readable log message. |
data | unknown | Yes | Structured context attached as OTel log attributes. |
| Use key-value objects to enable filtering and aggregation in your | |||
| observability backend (e.g. Grafana, Datadog, New Relic). |
logger.warn('Retry attempt', { attempt: 3, maxRetries: 5, endpoint: '/api/charge' })
MessageType · ChannelReader · ChannelWriter · Channel · FunctionRef · HttpAuthConfig · HttpInvocationConfig · InitOptions · RegisterFunctionInput · RegisterFunctionMessage · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerMessage · RegisterTriggerTypeInput · RegisterTriggerTypeMessage · RemoteFunctionHandler · StreamChannelRef · Trigger · TriggerHandler · TriggerInfo · TriggerRequest · IStream · DeleteResult · StreamSetResult · StreamUpdateResult · DeleteResult · IIIReconnectionConfig · OtelConfig · ReconnectionConfig · FunctionInfo · FunctionsAvailableCallback · RegisterFunctionFormat
| Name | Type | Required | Description |
|---|---|---|---|
InvocationResult | "invocationresult" | Yes | - |
InvokeFunction | "invokefunction" | Yes | - |
RegisterFunction | "registerfunction" | Yes | - |
RegisterService | "registerservice" | Yes | - |
RegisterTrigger | "registertrigger" | Yes | - |
RegisterTriggerType | "registertriggertype" | Yes | - |
TriggerRegistrationResult | "triggerregistrationresult" | Yes | - |
UnregisterFunction | "unregisterfunction" | Yes | - |
UnregisterTrigger | "unregistertrigger" | Yes | - |
UnregisterTriggerType | "unregistertriggertype" | Yes | - |
WorkerRegistered | "workerregistered" | Yes | - |
Read end of a streaming channel. Provides both a Node.js Readable stream
for binary data and an onMessage callback for structured text messages.
| Name | Type | Required | Description |
|---|---|---|---|
stream | Readable | Yes | Node.js Readable stream for binary data. |
Write end of a streaming channel. Provides both a Node.js Writable stream
and a sendMessage method for sending structured text messages.
| Name | Type | Required | Description |
|---|---|---|---|
stream | Writable | Yes | Node.js Writable stream for binary data. |
A streaming channel pair for worker-to-worker data transfer. Created via ISdk.createChannel.
| Name | Type | Required | Description |
|---|---|---|---|
reader | ChannelReader | Yes | Reader end of the channel. |
readerRef | StreamChannelRef | Yes | Serializable reference to the reader (can be sent to other workers). |
writer | ChannelWriter | Yes | Writer end of the channel. |
writerRef | StreamChannelRef | Yes | Serializable reference to the writer (can be sent to other workers). |
Handle returned by ISdk.registerFunction. Contains the function's
id and an unregister() method.
| Name | Type | Required | Description |
|---|---|---|---|
id | string | Yes | The unique function identifier. |
unregister | () => void | Yes | Removes this function from the engine. |
Authentication configuration for HTTP-invoked functions.
hmac -- HMAC signature verification using a shared secret.bearer -- Bearer token authentication.api_key -- API key sent via a custom header.type HttpAuthConfig = unknown | unknown | unknown
Configuration for registering an HTTP-invoked function (Lambda, Cloudflare Workers, etc.) instead of a local handler.
| Name | Type | Required | Description |
|---|---|---|---|
auth | HttpAuthConfig | No | Authentication configuration. |
headers | Record<string, string> | No | Custom headers to send with the request. |
method | "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | No | HTTP method. Defaults to POST. |
timeout_ms | number | No | Timeout in milliseconds. |
url | string | Yes | URL to invoke. |
Configuration options passed to registerWorker.
| Name | Type | Required | Description |
|---|---|---|---|
enableMetricsReporting | boolean | No | Enable worker metrics via OpenTelemetry. Defaults to true. |
invocationTimeoutMs | number | No | Default timeout for trigger() in milliseconds. Defaults to 30000. |
otel | Omit<OtelConfig, "engineWsUrl"> | No | OpenTelemetry configuration. OTel is initialized automatically by default. |
Set { enabled: false } or env OTEL_ENABLED=false/0/no/off to disable. | |||
The engineWsUrl is set automatically from the III address. | |||
reconnectionConfig | Partial<IIIReconnectionConfig> | No | WebSocket reconnection behavior. |
workerName | string | No | Display name for this worker. Defaults to hostname:pid. |
type RegisterFunctionInput = Omit<RegisterFunctionMessage, "message_type">
| Name | Type | Required | Description |
|---|---|---|---|
description | string | No | The description of the function |
id | string | Yes | The path of the function (use :: for namespacing, e.g. external::my_lambda) |
invocation | HttpInvocationConfig | No | HTTP invocation config for external HTTP functions (Lambda, Cloudflare Workers, etc.) |
message_type | MessageType.RegisterFunction | Yes | - |
metadata | Record<string, unknown> | No | - |
request_format | RegisterFunctionFormat | No | The request format of the function |
response_format | RegisterFunctionFormat | No | The response format of the function |
type RegisterServiceInput = Omit<RegisterServiceMessage, "message_type">
type RegisterTriggerInput = Omit<RegisterTriggerMessage, "message_type" | "id">
| Name | Type | Required | Description |
|---|---|---|---|
config | unknown | Yes | - |
function_id | string | Yes | - |
id | string | Yes | - |
message_type | MessageType.RegisterTrigger | Yes | - |
type | string | Yes | - |
type RegisterTriggerTypeInput = Omit<RegisterTriggerTypeMessage, "message_type">
| Name | Type | Required | Description |
|---|---|---|---|
description | string | Yes | - |
id | string | Yes | - |
message_type | MessageType.RegisterTriggerType | Yes | - |
Async function handler for a registered function. Receives the invocation payload and returns the result.
type RemoteFunctionHandler = (data: TInput) => Promise<TOutput>
Serializable reference to one end of a streaming channel. Can be included in invocation payloads to pass channel endpoints between workers.
| Name | Type | Required | Description |
|---|---|---|---|
access_key | string | Yes | Access key for authentication. |
channel_id | string | Yes | Unique channel identifier. |
direction | "read" | "write" | Yes | Whether this ref is for reading or writing. |
Handle returned by ISdk.registerTrigger. Use unregister() to
remove the trigger from the engine.
| Name | Type | Required | Description |
|---|---|---|---|
unregister | void | Yes | Removes this trigger from the engine. |
Handler interface for custom trigger types. Passed to
ISdk.registerTriggerType.
| Name | Type | Required | Description |
|---|---|---|---|
registerTrigger | Promise<void> | Yes | Called when a trigger instance is registered. |
unregisterTrigger | Promise<void> | Yes | Called when a trigger instance is unregistered. |
Information about a registered trigger.
| Name | Type | Required | Description |
|---|---|---|---|
config | unknown | No | Trigger-specific configuration. |
function_id | string | Yes | ID of the function this trigger is bound to. |
id | string | Yes | Unique trigger identifier. |
trigger_type | string | Yes | Type of the trigger (e.g. http, cron, queue). |
Request object passed to ISdk.trigger.
| Name | Type | Required | Description |
|---|---|---|---|
action | TriggerAction | No | Routing action. Omit for synchronous request/response. |
function_id | string | Yes | ID of the function to invoke. |
payload | TInput | Yes | Payload to pass to the function. |
timeoutMs | number | No | Override the default invocation timeout in milliseconds. |
Interface for custom stream implementations. Passed to ISdk.createStream
to override the engine's built-in stream storage.
| Name | Type | Required | Description |
|---|---|---|---|
delete | Promise<DeleteResult> | Yes | Delete a stream item. |
get | Promise<TData | null> | Yes | Retrieve a single item by group and item ID. |
list | Promise<TData[]> | Yes | List all items in a group. |
listGroups | Promise<string[]> | Yes | List all group IDs in a stream. |
set | Promise<StreamSetResult<TData> | null> | Yes | Set (create or overwrite) a stream item. |
update | Promise<StreamUpdateResult<TData> | null> | Yes | Apply atomic update operations to a stream item. |
Result of a stream delete operation.
| Name | Type | Required | Description |
|---|---|---|---|
old_value | any | No | Previous value (if it existed). |
Result of a stream set operation.
| Name | Type | Required | Description |
|---|---|---|---|
new_value | TData | Yes | New value that was stored. |
old_value | TData | No | Previous value (if it existed). |
Result of a stream update operation.
| Name | Type | Required | Description |
|---|---|---|---|
new_value | TData | Yes | New value after the update. |
old_value | TData | No | Previous value (if it existed). |
Result of a state delete operation.
| Name | Type | Required | Description |
|---|---|---|---|
old_value | any | No | Previous value (if it existed). |
Configuration for WebSocket reconnection behavior
| Name | Type | Required | Description |
|---|---|---|---|
backoffMultiplier | number | Yes | Exponential backoff multiplier (default: 2) |
initialDelayMs | number | Yes | Starting delay in milliseconds (default: 1000ms) |
jitterFactor | number | Yes | Random jitter factor 0-1 (default: 0.3) |
maxDelayMs | number | Yes | Maximum delay cap in milliseconds (default: 30000ms) |
maxRetries | number | Yes | Maximum retry attempts, -1 for infinite (default: -1) |
Configuration for OpenTelemetry initialization.
| Name | Type | Required | Description |
|---|---|---|---|
enabled | boolean | No | Whether OpenTelemetry export is enabled. Defaults to true. Set to false or OTEL_ENABLED=false/0/no/off to disable. |
engineWsUrl | string | No | III Engine WebSocket URL. Defaults to III_URL or "ws://localhost:49134". |
fetchInstrumentationEnabled | boolean | No | Whether to auto-instrument globalThis.fetch calls. Defaults to true. Works on Node.js, Bun, and Deno. Set to false to disable. |
instrumentations | Instrumentation<InstrumentationConfig>[] | No | OpenTelemetry instrumentations to register (e.g., PrismaInstrumentation). |
logsBatchSize | number | No | Maximum number of log records exported per batch. Defaults to 1. |
logsFlushIntervalMs | number | No | Log processor flush delay in milliseconds. Defaults to 100ms. |
metricsEnabled | boolean | No | Whether OpenTelemetry metrics export is enabled. Defaults to true. Set to false or OTEL_METRICS_ENABLED=false/0/no/off to disable. |
metricsExportIntervalMs | number | No | Metrics export interval in milliseconds. Defaults to 60000 (60 seconds). |
reconnectionConfig | Partial<ReconnectionConfig> | No | Optional reconnection configuration for the WebSocket connection. |
serviceInstanceId | string | No | The service instance ID to report. Defaults to SERVICE_INSTANCE_ID env var or auto-generated UUID. |
serviceName | string | No | The service name to report. Defaults to OTEL_SERVICE_NAME or "iii-node". |
serviceNamespace | string | No | The service namespace to report. Defaults to SERVICE_NAMESPACE env var. |
serviceVersion | string | No | The service version to report. Defaults to SERVICE_VERSION env var or "unknown". |
Configuration for WebSocket reconnection behavior
| Name | Type | Required | Description |
|---|---|---|---|
backoffMultiplier | number | Yes | Exponential backoff multiplier (default: 2) |
initialDelayMs | number | Yes | Starting delay in milliseconds (default: 1000ms) |
jitterFactor | number | Yes | Random jitter factor 0-1 (default: 0.3) |
maxDelayMs | number | Yes | Maximum delay cap in milliseconds (default: 30000ms) |
maxRetries | number | Yes | Maximum retry attempts, -1 for infinite (default: -1) |
Metadata about a registered function, returned by ISdk.listFunctions.
| Name | Type | Required | Description |
|---|---|---|---|
description | string | No | Human-readable description. |
function_id | string | Yes | Unique function identifier. |
metadata | Record<string, unknown> | No | Arbitrary metadata attached to the function. |
request_format | RegisterFunctionFormat | No | Schema describing expected request format. |
response_format | RegisterFunctionFormat | No | Schema describing expected response format. |
type FunctionsAvailableCallback = (functions: FunctionInfo[]) => void
| Name | Type | Required | Description |
|---|---|---|---|
body | RegisterFunctionFormat[] | No | The body of the parameter |
description | string | No | The description of the parameter |
items | RegisterFunctionFormat | No | The items of the parameter |
name | string | Yes | - |
required | boolean | No | Whether the parameter is required |
type | "string" | "number" | "boolean" | "object" | "array" | "null" | "map" | Yes | The type of the parameter |