docs/examples/conditions.mdx
Conditions are regular iii functions that the engine calls before a trigger's handler runs. Attach a condition to any trigger by adding the condition function's ID to the trigger config. If the condition returns false, the handler function is not called.
graph LR
Trigger -->|event data| Condition{{"condition fn"}}
Condition -->|"true"| Handler[handler fn]
Condition -->|"false"| Skip[handler not called]
The condition function receives the full API request. If it returns false, the engine responds with 422 Unprocessable Entity and the handler function is not called.
import { registerWorker, Logger, TriggerAction } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction(
{ id: 'conditions::is_premium' },
async (req: ApiRequest<{ amount: number }>) => {
return (req.body?.amount ?? 0) > 1000
},
)
iii.registerFunction(
{ id: 'orders::premium', description: 'Processes premium orders' },
async (req: ApiRequest<{ amount: number; description: string }>) => {
const logger = new Logger()
logger.info('Processing premium order', { amount: req.body?.amount })
return { status_code: 200, body: { message: 'Premium order processed' } }
},
)
iii.registerTrigger({
type: 'http',
function_id: 'orders::premium',
config: {
api_path: 'orders/premium',
http_method: 'POST',
condition_function_id: 'conditions::is_premium',
},
})
from iii import register_worker, InitOptions, ApiRequest, ApiResponse, Logger, TriggerAction
iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="orders-worker"))
def is_premium(data) -> bool:
req = ApiRequest(**data) if isinstance(data, dict) else data
return (req.body or {}).get("amount", 0) > 1000
def orders_premium(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
logger.info("Processing premium order", {"amount": (req.body or {}).get("amount")})
return ApiResponse(status_code=200, body={"message": "Premium order processed"})
iii.register_function({"id": "conditions::is_premium"}, is_premium)
iii.register_function({"id": "orders::premium"}, orders_premium)
iii.register_trigger({
"type": "http",
"function_id": "orders::premium",
"config": {
"api_path": "orders/premium",
"http_method": "POST",
"condition_function_id": "conditions::is_premium",
},
})
use iii_sdk::{register_worker, InitOptions, Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let iii = register_worker("ws://127.0.0.1:49134", InitOptions::default());
iii.register_function(RegisterFunctionMessage { id: "conditions::is_premium".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let amount = input["body"]["amount"].as_f64().unwrap_or(0.0);
Ok(json!(amount > 1000.0))
});
iii.register_function(RegisterFunctionMessage { id: "orders::premium".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
let amount = input["body"]["amount"].as_f64().unwrap_or(0.0);
logger.info("Processing premium order", Some(json!({ "amount": amount })));
Ok(json!({ "status_code": 200, "body": { "message": "Premium order processed" } }))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "http".into(), function_id: "orders::premium".into(), config: json!({
"api_path": "orders/premium",
"http_method": "POST",
"condition_function_id": "conditions::is_premium",
}) })?;
loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}
The condition function receives the queue message data. If it returns false, the handler function is not called — no error is raised.
iii.registerFunction(
{ id: 'conditions::is_high_value' },
async (data: { amount: number }) => {
return data.amount > 1000
},
)
iii.registerFunction(
{ id: 'orders::high_value', description: 'Processes high-value orders from queue' },
async (data: { amount: number; description: string }) => {
const logger = new Logger()
logger.info('Processing high-value order', { amount: data.amount })
await iii.trigger({
function_id: 'enqueue',
payload: { topic: 'order.processed', data: { ...data, processedAt: new Date().toISOString() } },
action: TriggerAction.Void(),
})
},
)
iii.registerTrigger({
type: 'queue',
function_id: 'orders::high_value',
config: { topic: 'order.created', condition_function_id: 'conditions::is_high_value' },
})
from datetime import datetime, timezone
from iii import Logger
def is_high_value(data) -> bool:
if isinstance(data, dict):
return data.get("amount", 0) > 1000
return False
def orders_high_value(data: dict) -> None:
logger = Logger()
amount = data.get("amount", 0)
logger.info("Processing high-value order", {"amount": amount})
if amount <= 1000:
logger.info("Low-value order — skipping", {"amount": amount})
return
iii.trigger({
"function_id": "enqueue",
"payload": {"topic": "order.processed", "data": {**data, "processedAt": datetime.now(timezone.utc).isoformat()}},
"action": TriggerAction.Void(),
})
iii.register_function({"id": "conditions::is_high_value"}, is_high_value)
iii.register_function({"id": "orders::high_value"}, orders_high_value)
iii.register_trigger({
"type": "queue",
"function_id": "orders::high_value",
"config": {"topic": "order.created", "condition_function_id": "conditions::is_high_value"},
})
use iii_sdk::{Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "conditions::is_high_value".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let amount = input["amount"].as_f64().unwrap_or(0.0);
Ok(json!(amount > 1000.0))
});
iii.register_function(RegisterFunctionMessage { id: "orders::high_value".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
let amount = input["amount"].as_f64().unwrap_or(0.0);
logger.info("Processing high-value order", Some(json!({ "amount": amount })));
iii.trigger(TriggerRequest::new("enqueue", json!({
"topic": "order.processed",
"data": { "amount": amount, "processedAt": chrono::Utc::now().to_rfc3339() },
})).action(TriggerAction::void())).await?;
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "queue".into(), function_id: "orders::high_value".into(), config: json!({
"topic": "order.created",
"condition_function_id": "conditions::is_high_value",
}) })?;
The condition receives the full state event, including the event type (created, updated, deleted), scope, key, and both old and new values.
iii.registerFunction(
{ id: 'conditions::is_update' },
async (event: { event_type: string; scope: string; key: string; old_value: unknown; new_value: unknown }) => {
return event.event_type === 'updated'
},
)
iii.registerFunction(
{ id: 'orders::on_update', description: 'Reacts to order state updates' },
async (event) => {
const logger = new Logger()
logger.info('Order updated', { scope: event.scope, key: event.key })
},
)
iii.registerTrigger({
type: 'state',
function_id: 'orders::on_update',
config: { scope: 'orders', condition_function_id: 'conditions::is_update' },
})
from iii import Logger
def is_update(event) -> bool:
if isinstance(event, dict):
return event.get("event_type") == "updated"
return False
def orders_on_update(event: dict) -> None:
logger = Logger()
logger.info("Order updated", {"scope": event.get("scope"), "key": event.get("key")})
iii.register_function({"id": "conditions::is_update"}, is_update)
iii.register_function({"id": "orders::on_update"}, orders_on_update)
iii.register_trigger({
"type": "state",
"function_id": "orders::on_update",
"config": {"scope": "orders", "condition_function_id": "conditions::is_update"},
})
use iii_sdk::{Logger, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "conditions::is_update".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let event_type = input["event_type"].as_str().unwrap_or("");
Ok(json!(event_type == "updated"))
});
iii.register_function(RegisterFunctionMessage { id: "orders::on_update".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, |input| async move {
let logger = Logger();
logger.info("Order updated", Some(json!({
"scope": input["scope"],
"key": input["key"],
})));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { trigger_type: "state".into(), function_id: "orders::on_update".into(), config: json!({
"scope": "orders",
"condition_function_id": "conditions::is_update",
}) })?;
registerFunction / register_function like any other function. They receive the same event data as the handler and must return true or false.condition_function_id to the trigger config with the condition function's ID. This key is the same for all trigger types.false:
422 Unprocessable Entity; the handler function is not called.500 Internal Server Error).