docs/modules/module-pubsub.mdx
Topic-based publish/subscribe messaging for broadcasting events to multiple subscribers in real time.
modules::pubsub::PubSubModule
- class: modules::pubsub::PubSubModule
config:
adapter:
class: modules::pubsub::LocalAdapter
In-memory pub/sub using broadcast channels. Messages are delivered only to subscribers running in the same engine process. No external dependencies required.
class: modules::pubsub::LocalAdapter
Uses Redis Pub/Sub as the backend. Enables event delivery across multiple engine instances.
class: modules::pubsub::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}
This module adds a new Trigger Type: subscribe.
The handler receives the raw value passed as data to the publish call. No envelope is added.
iii.registerTrigger({ type: 'subscribe', function_id: fn.id, config: { topic: 'orders.shipped' }, })
await iii.trigger({ function_id: 'publish', payload: { topic: 'orders.shipped', data: { orderId: 'abc-123', address: '123 Main St' }, }, action: TriggerAction.Void(), })
</Tab>
<Tab title="Python">
```python
def on_order_shipped(data):
print('Order shipped:', data)
return {}
iii.register_function({'id': 'notifications::onOrderShipped'}, on_order_shipped)
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::onOrderShipped', 'config': {'topic': 'orders.shipped'}})
iii.trigger({
'function_id': 'publish',
'payload': {
'topic': 'orders.shipped',
'data': {'orderId': 'abc-123', 'address': '123 Main St'},
},
})
iii.register_trigger(RegisterTriggerInput { trigger_type: "subscribe".into(), function_id: "notifications::onOrderShipped".into(), config: json!({ "topic": "orders.shipped" }), })?;
iii.trigger(TriggerRequest { function_id: "publish".into(), payload: json!({ "topic": "orders.shipped", "data": { "orderId": "abc-123", "address": "123 Main St" } }), action: Some(TriggerAction::Void), timeout_ms: None, }).await?;
</Tab>
</Tabs>
### Usage Example: Fanout Notification
One publisher triggers two independent subscribers on the same topic:
<Tabs>
<Tab title="TypeScript">
```typescript
const emailFn = iii.registerFunction(
{ id: 'notifications::sendEmailAlert' },
async (data) => {
await sendEmail(data.userId, `Order ${data.orderId} shipped`)
return {}
},
)
const pushFn = iii.registerFunction(
{ id: 'notifications::sendPushAlert' },
async (data) => {
await sendPushNotification(data.userId, `Order ${data.orderId} shipped`)
return {}
},
)
iii.registerTrigger({
type: 'subscribe',
function_id: emailFn.id,
config: { topic: 'orders.shipped' },
})
iii.registerTrigger({
type: 'subscribe',
function_id: pushFn.id,
config: { topic: 'orders.shipped' },
})
await iii.trigger({
function_id: 'publish',
payload: {
topic: 'orders.shipped',
data: { orderId: 'abc-123', userId: 'user-456' },
},
})
def send_push_alert(data): send_push_notification(data['userId'], f"Order {data['orderId']} shipped") return {}
iii.register_function({'id': 'notifications::sendEmailAlert'}, send_email_alert) iii.register_function({'id': 'notifications::sendPushAlert'}, send_push_alert)
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendEmailAlert', 'config': {'topic': 'orders.shipped'}}) iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendPushAlert', 'config': {'topic': 'orders.shipped'}})
iii.trigger({ 'function_id': 'publish', 'payload': { 'topic': 'orders.shipped', 'data': {'orderId': 'abc-123', 'userId': 'user-456'}, }, })
</Tab>
<Tab title="Rust">
```rust
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput, TriggerRequest};
use serde_json::json;
iii.register_function(
RegisterFunctionMessage { id: "notifications::sendEmailAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
|data| async move {
let order_id = data["orderId"].as_str().unwrap_or("");
let user_id = data["userId"].as_str().unwrap_or("");
send_email(user_id, &format!("Order {} shipped", order_id)).await?;
Ok(json!({}))
},
);
iii.register_function(
RegisterFunctionMessage { id: "notifications::sendPushAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
|data| async move {
let order_id = data["orderId"].as_str().unwrap_or("");
let user_id = data["userId"].as_str().unwrap_or("");
send_push_notification(user_id, &format!("Order {} shipped", order_id)).await?;
Ok(json!({}))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "subscribe".into(),
function_id: "notifications::sendEmailAlert".into(),
config: json!({ "topic": "orders.shipped" }),
})?;
iii.register_trigger(RegisterTriggerInput {
trigger_type: "subscribe".into(),
function_id: "notifications::sendPushAlert".into(),
config: json!({ "topic": "orders.shipped" }),
})?;
iii.trigger(TriggerRequest {
function_id: "publish".into(),
payload: json!({
"topic": "orders.shipped",
"data": { "orderId": "abc-123", "userId": "user-456" }
}),
action: None,
timeout_ms: None,
}).await?;
| Feature | PubSub | Queue |
|---|---|---|
| Delivery | Broadcast to all subscribers | Single consumer per message |
| Persistence | No (fire-and-forget) | Yes (with retries and DLQ) |
| Ordering | Not guaranteed | FIFO within topic |
| Best for | Real-time notifications, fanout | Reliable background processing |
sequenceDiagram
participant Publisher as Publisher
participant E as Engine
participant A as PubSub Adapter
participant S1 as Subscriber1
participant S2 as Subscriber2
Publisher->>E: publish (topic: orders.shipped)
E->>A: Broadcast event
A->>E: Notify all subscribers
E->>S1: Invoke handler
E->>S2: Invoke handler
S1-->>E: Complete
S2-->>E: Complete