docs/advanced/custom-modules.mdx
Custom modules in the iii Engine allow developers to extend the core functionality of the system. A module acts as a container for logic that can register functions, triggers, and integrate with external systems.
Modules are dynamically loaded and configured, often utilizing an Adapter pattern to allow for swappable backend implementations (e.g., swapping an in-memory event bus for a Redis-backed one).
The engine provides a trait-based system where modules implement the CoreModule trait for lifecycle management and the ConfigurableModule trait for handling configuration and adapter injection.
graph TD
Engine[iii Engine] -->|Loads| Module[Custom Module]
Module -->|Uses| Adapter[Adapter Interface]
Adapter -.->|implements| Redis[Redis Adapter]
Adapter -.->|implements| Memory[In-Memory Adapter]
Adapter -.->|implements| Custom[Custom Adapter]
Module -->|Registers| Functions[Functions]
Module -->|Registers| Triggers[Triggers]
The module system is built around two primary traits: CoreModule and ConfigurableModule.
| Trait | Description | Key Methods |
|---|---|---|
| CoreModule | The base trait for all modules. Handles lifecycle, identification, and function registration. | name(), create(), initialize(), register_functions(), start_background_tasks(), destroy() |
| ConfigurableModule | Extends CoreModule to support typed configuration and pluggable adapters. | build(), registry(), adapter_class_from_config() |
The following diagram illustrates the lifecycle of a module from creation to initialization.
sequenceDiagram
participant Engine
participant Builder
participant Module
participant Adapter
Note over Builder, Module: Module Registration Phase
Builder->>Module: create(engine, config)
alt is ConfigurableModule
Module->>Module: create_with_adapters(engine, config)
Module->>Adapter: factory(engine, config)
Adapter-->>Module: Arc<AdapterInstance>
Module->>Module: build(engine, config, adapter)
end
Builder->>Module: initialize()
activate Module
Module->>Engine: register_trigger_type() (Optional)
Module-->>Builder: Result
deactivate Module
Builder->>Module: register_functions(engine)
activate Module
Module->>Engine: register_function()
deactivate Module
Developing a custom module typically involves defining an adapter interface, implementing specific adapters, and then wrapping them in a module structure.
Define an async_trait that specifies the behavior your module's backend must implement. This allows users to switch implementations via configuration.
use async_trait::async_trait;
use serde_json::Value;
#[async_trait]
pub trait CustomEventAdapter: Send + Sync + 'static {
async fn emit(&self, topic: &str, event_data: Value);
async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
async fn unsubscribe(&self, topic: &str, id: &str);
}
Why async_trait? Rust's async traits require this macro to handle the complexity of async function pointers.
To make adapters discoverable by the configuration system, you must define a registration struct and use the inventory crate.
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;
use iii::Engine;
pub type CustomEventAdapterFuture = Pin<
Box<dyn Future<Output = anyhow::Result<Arc<dyn CustomEventAdapter>>> + Send>
>;
pub struct CustomEventAdapterRegistration {
pub class: &'static str,
pub factory: fn(Arc<Engine>, Option<Value>) -> CustomEventAdapterFuture,
}
// Implement AdapterRegistrationEntry trait
impl AdapterRegistrationEntry<dyn CustomEventAdapter> for CustomEventAdapterRegistration {
fn class(&self) -> &'static str {
self.class
}
fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> CustomEventAdapterFuture {
self.factory
}
}
// Register the type with inventory
inventory::collect!(CustomEventAdapterRegistration);
Purpose: This registration system allows the engine to discover and instantiate adapters dynamically based on configuration.
Define factory functions that instantiate your specific adapter implementations (e.g., InMemory or Logging).
use iii::register_adapter;
fn make_inmemory_adapter(
engine: Arc<Engine>,
config: Option<Value>
) -> CustomEventAdapterFuture {
Box::pin(async move {
Ok(Arc::new(InMemoryEventAdapter::new(config, engine).await?)
as Arc<dyn CustomEventAdapter>)
})
}
// Register the specific adapter implementation
register_adapter!(
<CustomEventAdapterRegistration>
"my::InMemoryEventAdapter",
make_inmemory_adapter
);
Create the actual adapter implementations.
<AccordionGroup> <Accordion title="In-Memory Adapter" icon="memory-stick"> Simple in-memory implementation for development and testing.```rust
use std::collections::HashMap;
use tokio::sync::RwLock;
pub struct InMemoryEventAdapter {
subscriptions: Arc<RwLock<HashMap<String, HashMap<String, String>>>>,
engine: Arc<Engine>,
}
impl InMemoryEventAdapter {
pub async fn new(
_config: Option<Value>,
engine: Arc<Engine>
) -> anyhow::Result<Self> {
Ok(Self {
subscriptions: Arc::new(RwLock::new(HashMap::new())),
engine,
})
}
}
#[async_trait]
impl CustomEventAdapter for InMemoryEventAdapter {
async fn emit(&self, topic: &str, event_data: Value) {
let subs = self.subscriptions.read().await;
if let Some(by_id) = subs.get(topic) {
for function_id in by_id.values() {
let _ = self.engine.call(function_id, event_data.clone()).await;
}
}
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
let mut subs = self.subscriptions.write().await;
subs.entry(topic.to_string())
.or_insert_with(HashMap::new)
.insert(id.to_string(), function_id.to_string());
}
async fn unsubscribe(&self, topic: &str, id: &str) {
let mut subs = self.subscriptions.write().await;
if let Some(by_id) = subs.get_mut(topic) {
by_id.remove(id);
}
}
}
```
```rust
pub struct LoggingEventAdapter {
inner: Arc<dyn CustomEventAdapter>,
}
#[async_trait]
impl CustomEventAdapter for LoggingEventAdapter {
async fn emit(&self, topic: &str, event_data: Value) {
tracing::info!(
topic = %topic,
event_data = %event_data,
"Emitting event"
);
self.inner.emit(topic, event_data).await;
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
tracing::info!(topic = %topic, "Subscribing to topic");
self.inner.subscribe(topic, id, function_id).await;
}
async fn unsubscribe(&self, topic: &str, id: &str) {
tracing::info!(topic = %topic, "Unsubscribing from topic");
self.inner.unsubscribe(topic, id).await;
}
}
```
The module struct holds the Engine reference and the injected Adapter.
use serde::Deserialize;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
#[serde(default)]
pub adapter: Option<AdapterEntry>,
}
#[derive(Clone)]
pub struct CustomEventModule {
adapter: Arc<dyn CustomEventAdapter>,
engine: Arc<Engine>,
_config: CustomEventModuleConfig,
}
#[async_trait]
impl ConfigurableModule for CustomEventModule {
type Config = CustomEventModuleConfig;
type Adapter = dyn CustomEventAdapter;
type AdapterRegistration = CustomEventAdapterRegistration;
const DEFAULT_ADAPTER_CLASS: &'static str = "my::InMemoryEventAdapter";
// Define how to access the registry
async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomEventAdapter>>>> =
Lazy::new(|| RwLock::new(CustomEventModule::build_registry()));
®ISTRY
}
// Builder method
fn build(
engine: Arc<Engine>,
config: Self::Config,
adapter: Arc<Self::Adapter>
) -> Self {
Self {
engine,
_config: config,
adapter
}
}
}
Modules expose functionality to the engine (and thus to workers) by registering functions. This is typically done in the initialize method or register_functions.
When registering a function, you must provide a RegisterFunctionRequest.
| Field | Type | Description |
|---|---|---|
function_id | String | Unique function ID (e.g., "custom::emit") |
description | Option<String> | Human-readable description of the function |
request_format | Option<Value> | JSON Schema defining the expected input |
response_format | Option<Value> | JSON Schema defining the expected output |
use iii::RegisterFunctionRequest;
#[async_trait]
impl CoreModule for CustomEventModule {
fn name(&self) -> &str {
"custom_event"
}
async fn initialize(&self) -> anyhow::Result<()> {
self.engine.register_function(
RegisterFunctionRequest {
function_id: "custom::emit".to_string(),
description: Some("Emit a custom event".to_string()),
request_format: Some(serde_json::json!({
"type": "object",
"properties": {
"topic": { "type": "string" },
"data": { "type": "object" }
},
"required": ["topic", "data"]
})),
response_format: None,
metadata: None,
},
Box::new(self.clone()), // The handler
);
Ok(())
}
async fn register_functions(&self, _engine: Arc<Engine>) -> anyhow::Result<()> {
// Additional function registrations can go here
Ok(())
}
}
To handle invocations, the module (or a specific handler struct) must implement the FunctionHandler trait.
use iii::{FunctionHandler, FunctionResult};
#[async_trait]
impl FunctionHandler for CustomEventModule {
async fn handle(&self, input: Value) -> FunctionResult {
// 1. Parse Input
let topic = input.get("topic")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'topic' field"))?;
let data = input.get("data")
.cloned()
.unwrap_or(Value::Null);
// 2. Execute Logic (using the adapter)
self.adapter.emit(topic, data).await;
// 3. Return Result
FunctionResult::Success(None)
}
}
Modules can also act as sources of events by registering TriggerTypes. This allows the engine to route external events (like Cron ticks or HTTP requests) to specific functions.
graph TD
Module[Core Module] -->|Registers| TT[TriggerType]
TT -->|Contains| Registrator[TriggerRegistrator]
Worker -->|Sends registertrigger| Engine
Engine -->|Delegates to| Registrator
Registrator -->|Stores| TriggerDefinition
ExternalEvent[External Event e.g., Timer/HTTP] --> Module
Module -->|Look up| TriggerDefinition
Module -->|Invoke| Engine
Engine -->|Route to| Worker
To support triggers, a module implements TriggerRegistrator.
use iii::{TriggerRegistrator, Trigger, TriggerType};
use std::future::Future;
use std::pin::Pin;
impl TriggerRegistrator for CustomEventModule {
fn register_trigger(
&self,
trigger: Trigger,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
Box::pin(async move {
// Extract configuration
let config = trigger.config;
let subscribes = config.get("subscribes")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("Missing 'subscribes' array"))?;
// Subscribe to each topic
for topic in subscribes {
let topic_str = topic.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid topic"))?;
self.adapter.subscribe(
topic_str,
&trigger.id,
&trigger.function_id
).await;
}
Ok(())
})
}
}
Then, register the trigger type during initialization:
async fn initialize(&self) -> anyhow::Result<()> {
// Register functions
// ... (function registration code)
// Register trigger type
let trigger_type = TriggerType {
id: "event".to_string(),
registrator: Box::new(self.clone()),
description: Some("Event-based trigger".to_string()),
};
self.engine.register_trigger_type(trigger_type).await?;
Ok(())
}
Modules are configured via iii-config.yaml or JSON passed during initialization. The ConfigurableModule trait maps this configuration to a Rust struct.
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
#[serde(default)]
pub adapter: Option<AdapterEntry>,
}
modules:
- class: my::CustomEventModule
config:
adapter:
class: my::LoggingEventAdapter
config:
inner_adapter: my::InMemoryEventAdapter
Nested Adapters: The logging adapter wraps the in-memory adapter, creating a decorator pattern for cross-cutting concerns.
Here's a complete custom module implementation:
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use iii::{
Engine, CoreModule, ConfigurableModule, FunctionHandler,
FunctionResult, RegisterFunctionRequest, TriggerRegistrator,
Trigger, TriggerType, AdapterEntry
};
// 1. Define Adapter Trait
#[async_trait]
pub trait CustomEventAdapter: Send + Sync + 'static {
async fn emit(&self, topic: &str, event_data: Value);
async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
async fn unsubscribe(&self, topic: &str, id: &str);
}
// 2. Implement In-Memory Adapter
pub struct InMemoryEventAdapter {
subscriptions: Arc<RwLock<HashMap<String, HashMap<String, String>>>>,
engine: Arc<Engine>,
}
impl InMemoryEventAdapter {
pub async fn new(_config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
Ok(Self {
subscriptions: Arc::new(RwLock::new(HashMap::new())),
engine,
})
}
}
#[async_trait]
impl CustomEventAdapter for InMemoryEventAdapter {
async fn emit(&self, topic: &str, event_data: Value) {
let subs = self.subscriptions.read().await;
if let Some(by_id) = subs.get(topic) {
for function_id in by_id.values() {
let _ = self.engine.call(function_id, event_data.clone()).await;
}
}
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
let mut subs = self.subscriptions.write().await;
subs.entry(topic.to_string())
.or_insert_with(HashMap::new)
.insert(id.to_string(), function_id.to_string());
}
async fn unsubscribe(&self, topic: &str, id: &str) {
let mut subs = self.subscriptions.write().await;
if let Some(by_id) = subs.get_mut(topic) {
by_id.remove(id);
}
}
}
// 3. Define Module Configuration
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
#[serde(default)]
pub adapter: Option<AdapterEntry>,
}
// 4. Implement Module
#[derive(Clone)]
pub struct CustomEventModule {
adapter: Arc<dyn CustomEventAdapter>,
engine: Arc<Engine>,
_config: CustomEventModuleConfig,
}
#[async_trait]
impl CoreModule for CustomEventModule {
fn name(&self) -> &str {
"custom_event"
}
async fn initialize(&self) -> anyhow::Result<()> {
// Register emit function
self.engine.register_function(
RegisterFunctionRequest {
function_id: "custom::emit".to_string(),
description: Some("Emit a custom event".to_string()),
request_format: Some(serde_json::json!({
"type": "object",
"properties": {
"topic": { "type": "string" },
"data": { "type": "object" }
},
"required": ["topic", "data"]
})),
response_format: None,
metadata: None,
},
Box::new(self.clone()),
);
// Register trigger type
let trigger_type = TriggerType {
id: "event".to_string(),
registrator: Box::new(self.clone()),
description: Some("Event-based trigger".to_string()),
};
self.engine.register_trigger_type(trigger_type).await?;
Ok(())
}
}
#[async_trait]
impl FunctionHandler for CustomEventModule {
async fn handle(&self, input: Value) -> FunctionResult {
let topic = input.get("topic").and_then(|v| v.as_str()).unwrap_or("");
let data = input.get("data").cloned().unwrap_or(Value::Null);
self.adapter.emit(topic, data).await;
FunctionResult::Success(None)
}
}
impl TriggerRegistrator for CustomEventModule {
fn register_trigger(
&self,
trigger: Trigger,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
Box::pin(async move {
let config = trigger.config;
let subscribes = config.get("subscribes")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("Missing 'subscribes' array"))?;
for topic in subscribes {
let topic_str = topic.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid topic"))?;
self.adapter.subscribe(topic_str, &trigger.id, &trigger.function_id).await;
}
Ok(())
})
}
}
modules:
- class: my::CustomEventModule
config:
adapter:
class: my::InMemoryEventAdapter
```rust
// Good: Adapter-based design
pub trait StorageAdapter {
async fn save(&self, key: &str, value: Value);
}
// Avoid: Hard-coded implementation
pub struct Module {
redis: RedisClient, // Tightly coupled
}
```
```rust
impl Drop for CustomEventModule {
fn drop(&mut self) {
// Clean up resources
tracing::info!("Shutting down CustomEventModule");
}
}
```
```rust
request_format: Some(serde_json::json!({
"type": "object",
"properties": {
"email": { "type": "string", "format": "email" },
"age": { "type": "number", "minimum": 0 }
},
"required": ["email"]
}))
```
```rust
tracing::info!(
topic = %topic,
subscriber_count = by_id.len(),
"Emitting event to subscribers"
);
```