docs/modules/module-stream.mdx
Durable streams for real-time data subscriptions.
modules::stream::StreamModule
graph LR
Client[Client] -->|WebSocket| Stream[StreamModule]
Stream -->|Auth| Engine[Engine]
Engine -->|Context| Stream
Stream -->|Connected| Client
Worker[Worker] -->|stream::set| Engine
Engine -->|Persist + Notify| Stream
Stream -.->|Push Update| Client
Stream -.->|Fire Triggers| Engine
When a worker triggers stream::set, the engine:
stream triggers and fires matching handlersA single stream::set handles persistence, real-time delivery, and reactive logic in one operation.
Streams organize data hierarchically: stream_name > group_id > item_id.
chat, presence, dashboard)room-1, team-alpha)user-123, msg-456)Clients subscribe at the group level by connecting to ws://host:port/stream/{stream_name}/{group_id}/. They receive all item-level changes within that group.
- class: modules::stream::StreamModule
config:
port: ${STREAM_PORT:3112}
host: 0.0.0.0
adapter:
class: modules::stream::adapters::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}
Uses Redis as the backend for the streams. Stores stream data in Redis and leverages Redis Pub/Sub for real-time event delivery.
class: modules::stream::adapters::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}
Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.
class: modules::stream::adapters::KvStore
config:
store_method: file_based
file_path: ./data/streams_store.db
{' '}
<AccordionGroup> <Accordion title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream to retrieve the value from. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID in the stream to retrieve the value from. </ResponseField> <ResponseField name="item_id" type="string" required> The item ID in the stream to retrieve. </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="value" type="any" required> The value retrieved from the stream. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::delete" type="function"> Deletes a value from the stream. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream to delete the value from. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID in the stream to delete the value from. </ResponseField> <ResponseField name="item_id" type="string" required> The item ID in the stream to delete. </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="old_value" type="any"> The value that was deleted, or `null` if the item did not exist. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::list" type="function"> Retrieves a group from the stream. This function will return all the items in the group. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream to retrieve the group from. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID in the stream to retrieve the group from. </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="group" type="any[]" required> The group retrieved from the stream. It's an array of items in the group. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::list_groups" type="function"> List all groups in a stream. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream to list groups from. </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="groups" type="string[]" required> An array of group IDs in the stream. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::list_all" type="function"> List all streams with their group metadata. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> This function takes no parameters. </Accordion> <Accordion title="Returns"> <ResponseField name="stream" type="object[]" required> An array of stream metadata objects. Each object has an `id` (string) and a `groups` (string[]) field. </ResponseField> <ResponseField name="count" type="number" required> The total number of streams. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::send" type="function"> Send a custom event to all subscribers of a stream group. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream to send the event to. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID in the stream to send the event to. </ResponseField> <ResponseField name="type" type="string" required> The event type string delivered to subscribers. </ResponseField> <ResponseField name="id" type="string"> Optional item ID to associate the event with. </ResponseField> <ResponseField name="data" type="any" required> The event payload delivered to subscribers. </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="result" type="null"> Returns `null` on success. </ResponseField> </Accordion> </AccordionGroup> </ResponseField> <ResponseField name="stream::update" type="function"> Atomically update an item in the stream using a list of operations. <AccordionGroup> <Accordion iconName="settings" title="Parameters"> <ResponseField name="stream_name" type="string" required> The ID of the stream containing the item to update. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID in the stream containing the item to update. </ResponseField> <ResponseField name="item_id" type="string" required> The item ID in the stream to update. </ResponseField> <ResponseField name="ops" type="UpdateOp[]" required> The list of atomic operations to apply. Each operation is a tagged object with a `type` field (`set`, `merge`, `increment`, `decrement`, or `remove`) and associated fields (`path`, `value`, `by`). </ResponseField> </Accordion> <Accordion title="Returns"> <ResponseField name="old_value" type="any"> The previous value, or `null` if the item was newly created. </ResponseField> <ResponseField name="new_value" type="any" required> The value now stored in the stream after applying the operations. </ResponseField> </Accordion> </AccordionGroup> </ResponseField>It's possible to implement a function to handle authentication.
iii.register_function({'id': 'onAuth'}, on_auth)
</Tab>
<Tab title="Rust">
```rust
iii.register_function(RegisterFunctionMessage { id: "onAuth".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |_input| async move {
Ok(json!({ "context": { "name": "John Doe" } }))
});
- class: modules::stream::StreamModule
config:
auth_function: onAuth
onAuth will be called with the request data.This module adds three trigger types: stream (item changes), stream:join (WebSocket connect), and stream:leave (WebSocket disconnect).
Fire when a client connects or disconnects via WebSocket. Both trigger types deliver the same payload to the handler:
<ResponseField name="subscription_id" type="string" required> The subscription ID, used for uniqueness and logging. </ResponseField> <ResponseField name="stream_name" type="string" required> The stream name of the subscription. </ResponseField> <ResponseField name="group_id" type="string" required> The group ID of the subscription. </ResponseField> <ResponseField name="id" type="string"> The item ID of the subscription, if provided by the client. </ResponseField> <ResponseField name="context" type="object"> The context generated by the authentication layer. </ResponseField>Fires when an item changes in the stream (via stream::set, stream::update, or stream::delete). Register with a config object to filter which stream, group, or item triggers the handler:
iii.registerTrigger({ type: 'stream:join', function_id: fn.id, config: {}, })
</Tab>
<Tab title="Python">
```python
def on_join(input):
print('Joined stream', input)
return {}
iii.register_function({'id': 'onJoin'}, on_join)
iii.register_trigger({'type': 'stream:join', 'function_id': 'onJoin', 'config': {}})
iii.register_trigger(RegisterTriggerInput { trigger_type: "stream:join".into(), function_id: "onJoin".into(), config: json!({}) })?;
</Tab>
</Tabs>
### Usage Example: Real-Time Presence
Streams organize data by `stream_name`, `group_id`, and `item_id`. Use for live presence, collaborative docs, or dashboards:
<Tabs>
<Tab title="Node / TypeScript">
```typescript
import { registerWorker, TriggerAction } from 'iii-sdk'
const iii = registerWorker('ws://localhost:49134')
iii.trigger({
function_id: 'stream::set',
payload: {
stream_name: 'presence',
group_id: 'room-1',
item_id: 'user-123',
data: { name: 'Alice', online: true, lastSeen: new Date().toISOString() },
},
action: TriggerAction.Void(),
})
const user = await iii.trigger({
function_id: 'stream::get',
payload: {
stream_name: 'presence',
group_id: 'room-1',
item_id: 'user-123',
},
})
const roomMembers = await iii.trigger({
function_id: 'stream::list',
payload: {
stream_name: 'presence',
group_id: 'room-1',
},
})
iii.trigger({
function_id: 'stream::delete',
payload: {
stream_name: 'presence',
group_id: 'room-1',
item_id: 'user-123',
},
action: TriggerAction.Void(),
})
iii = register_worker('ws://localhost:49134')
iii.trigger({ 'function_id': 'stream::set', 'payload': { 'stream_name': 'presence', 'group_id': 'room-1', 'item_id': 'user-123', 'data': {'name': 'Alice', 'online': True, 'lastSeen': '2026-01-01T00:00:00Z'}, }, 'action': TriggerAction.Void(), })
user = iii.trigger({ 'function_id': 'stream::get', 'payload': { 'stream_name': 'presence', 'group_id': 'room-1', 'item_id': 'user-123', }, })
room_members = iii.trigger({ 'function_id': 'stream::list', 'payload': { 'stream_name': 'presence', 'group_id': 'room-1', }, })
iii.trigger({ 'function_id': 'stream::delete', 'payload': { 'stream_name': 'presence', 'group_id': 'room-1', 'item_id': 'user-123', }, 'action': TriggerAction.Void(), })
</Tab>
<Tab title="Rust">
```rust
use iii_sdk::{register_worker, InitOptions, TriggerRequest, TriggerAction};
use serde_json::json;
let iii = register_worker("ws://localhost:49134", InitOptions::default());
iii.trigger(TriggerRequest::new("stream::set", json!({
"stream_name": "presence",
"group_id": "room-1",
"item_id": "user-123",
"data": { "name": "Alice", "online": true }
})).action(TriggerAction::void())).await?;
let user = iii.trigger(TriggerRequest::new("stream::get", json!({
"stream_name": "presence",
"group_id": "room-1",
"item_id": "user-123"
}))).await?;
let room_members = iii.trigger(TriggerRequest::new("stream::list", json!({
"stream_name": "presence",
"group_id": "room-1"
}))).await?;
iii.trigger(TriggerRequest::new("stream::delete", json!({
"stream_name": "presence",
"group_id": "room-1",
"item_id": "user-123"
})).action(TriggerAction::void())).await?;
Clients connect via WebSocket to ws://host:3112/stream/presence/room-1/ and receive real-time updates when items change.
Configure the stream module with an auth function:
- class: modules::stream::StreamModule
config:
port: 3112
host: 0.0.0.0
auth_function: stream::auth
adapter:
class: modules::stream::adapters::KvStore
config:
store_method: file_based
file_path: ./data/stream_store
Register the auth function. Clients may send the token via Authorization: Bearer <token> (Node.js) or Sec-WebSocket-Protocol: Authorization,<token> (browser stream-client):
iii.register_function({'id': 'stream::auth'}, stream_auth)
</Tab>
<Tab title="Rust">
```rust
iii.register_function(RegisterFunctionMessage { id: "stream::auth".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let headers = input.get("headers").and_then(|h| h.as_object());
let token = headers
.and_then(|h| h.get("authorization"))
.and_then(|v| v.as_str())
.and_then(|s| s.strip_prefix("Bearer "));
match token {
Some(_) => Ok(json!({ "context": { "userId": "user-from-token" } })),
None => Ok(json!({ "context": null })),
}
});
Join/leave triggers receive the auth context:
iii.registerTrigger({ type: 'stream:join', function_id: fn.id, config: {}, })
</Tab>
<Tab title="Python">
```python
def on_join(input):
context = input.get('context', {})
if context.get('userId'):
print(f"User {context['userId']} joined {input['stream_name']}/{input['group_id']}/{input.get('id', '')}")
return {}
iii.register_function({'id': 'onJoin'}, on_join)
iii.register_trigger({'type': 'stream:join', 'function_id': 'onJoin', 'config': {}})
iii.register_trigger(RegisterTriggerInput { trigger_type: "stream:join".into(), function_id: "onJoin".into(), config: json!({}) })?;
</Tab>
</Tabs>
### Usage Example: Conditional Join
<Tabs>
<Tab title="Node / TypeScript">
```typescript
const conditionFn = iii.registerFunction(
{ id: 'conditions::requireContext' },
async (input) => input.context?.userId != null,
)
const fn = iii.registerFunction({ id: 'onJoin' }, (input) => {
console.log('User joined:', input.context?.userId, input.stream_name)
return {}
})
iii.registerTrigger({
type: 'stream:join',
function_id: fn.id,
config: { condition_function_id: conditionFn.id },
})
iii.register_function({'id': 'conditions::requireContext'}, require_context)
def on_join(input): print('User joined:', input.get('context', {}).get('userId'), input['stream_name']) return {}
iii.register_function({'id': 'onJoin'}, on_join) iii.register_trigger({ 'type': 'stream:join', 'function_id': 'onJoin', 'config': {'condition_function_id': 'conditions::requireContext'}, })
</Tab>
<Tab title="Rust">
```rust
iii.register_function(RegisterFunctionMessage { id: "conditions::requireContext".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let has_user = input
.get("context")
.and_then(|c| c.get("userId"))
.is_some();
Ok(json!(has_user))
});
iii.register_function(RegisterFunctionMessage { id: "onJoin".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let user_id = input.get("context").and_then(|c| c.get("userId")).and_then(|u| u.as_str()).unwrap_or("");
let stream = input["stream_name"].as_str().unwrap_or("");
println!("User joined: {} {}", user_id, stream);
Ok(json!({}))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "stream:join".into(), function_id: "onJoin".into(), config: json!({
"condition_function_id": "conditions::requireContext"
}) })?;