Back to Iii

Manage Shared State

docs/how-to/manage-state.mdx

0.13.010.0 KB
Original Source

Goal

Use the State module to share data between Functions without a separate database.

Steps

1. Enable the State module

yaml
modules:
  - class: modules::state::StateModule
    config:
      adapter:
        class: modules::state::adapters::KvStore
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/state_store.db  # required for file_based

2. Write state

<Tabs> <Tab title="Node / TypeScript"> ```typescript title="state-writer.ts" import { registerWorker, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'users::create' }, async (input) => { const logger = new Logger() const userId = crypto.randomUUID() const user = { id: userId, name: input.name, email: input.email }

await iii.trigger({ function_id: 'state::set', payload: { scope: 'users', key: userId, value: user }, })

logger.info('User saved to state', { userId }) return { userId } })

// Then call from another function or worker const { userId } = await iii.trigger({ function_id: 'users::create', payload: { name: 'Alice', email: '[email protected]' }, }) logger.info('Created user', { userId })

</Tab>
<Tab title="Python">
```python title="state_writer.py"
import os
import uuid

from iii import Logger, register_worker

iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


def create_user(input):
    logger = Logger()
    user_id = str(uuid.uuid4())
    user = {"id": user_id, "name": input["name"], "email": input["email"]}

    iii.trigger({
        "function_id": "state::set",
        "payload": {"scope": "users", "key": user_id, "value": user},
    })

    logger.info("User saved to state", {"userId": user_id})
    return {"userId": user_id}


iii.register_function({"id": "users::create"}, create_user)

# Then call from another function or worker
result = iii.trigger({
    "function_id": "users::create",
    "payload": {"name": "Alice", "email": "[email protected]"},
})
print("Created user:", result["userId"])
</Tab> <Tab title="Rust"> ```rust title="state_writer.rs" use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunctionMessage, TriggerRequest}; use serde_json::json; use tokio::signal;

#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()); let iii = register_worker(&url, InitOptions::default());

let iii_clone = iii.clone();
iii.register_function(
    RegisterFunctionMessage {
        id: "users::create".to_string(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    move |input| {
        let iii = iii_clone.clone();
        async move {
            let logger = Logger::new();
            let user_id = uuid::Uuid::new_v4().to_string();
            let name = input["name"].as_str().unwrap_or("");
            let email = input["email"].as_str().unwrap_or("");

            iii.trigger(TriggerRequest {
                function_id: "state::set".into(),
                payload: json!({
                    "scope": "users",
                    "key": user_id,
                    "value": { "id": user_id, "name": name, "email": email },
                }),
                action: None,
                timeout_ms: None,
            }).await?;

            logger.info("User saved to state", Some(json!({ "userId": user_id })));
            Ok(json!({ "userId": user_id }))
        }
    },
);

signal::ctrl_c().await?;
Ok(())

}

// Then call from another function or worker let result = iii .trigger(TriggerRequest { function_id: "users::create".into(), payload: json!({ "name": "Alice", "email": "[email protected]" }), action: None, timeout_ms: None, }) .await?; println!("Created user: {}", result["userId"]);

</Tab>
</Tabs>

### 3. Read state

<Tabs>
<Tab title="Node / TypeScript">
```typescript title="state-reader.ts"
iii.registerFunction({ id: 'users::get' }, async (input) => {
  const logger = new Logger()

  const user = await iii.trigger({
    function_id: 'state::get',
    payload: { scope: 'users', key: input.userId },
  })

  logger.info('User retrieved', { userId: input.userId })
  return user
})

// Then call from another function or worker
const user = await iii.trigger({
  function_id: 'users::get',
  payload: { userId: 'some-user-id' },
})
logger.info('Retrieved user', user)
</Tab> <Tab title="Python"> ```python title="state_reader.py" def get_user(input): logger = Logger()
user = iii.trigger({
    "function_id": "state::get",
    "payload": {"scope": "users", "key": input["userId"]},
})

logger.info("User retrieved", {"userId": input["userId"]})
return user

iii.register_function({"id": "users::get"}, get_user)

Then call from another function or worker

user = iii.trigger({ "function_id": "users::get", "payload": {"userId": "some-user-id"}, }) print("Retrieved user:", user)

</Tab>
<Tab title="Rust">
```rust title="state_reader.rs"
let iii_clone = iii.clone();
iii.register_function(
    RegisterFunctionMessage {
        id: "users::get".to_string(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    move |input| {
        let iii = iii_clone.clone();
        async move {
            let logger = Logger::new();
            let user_id = input["userId"].as_str().unwrap_or("");

            let user = iii.trigger(TriggerRequest {
                function_id: "state::get".into(),
                payload: json!({
                    "scope": "users",
                    "key": user_id,
                }),
                action: None,
                timeout_ms: None,
            }).await?;

            logger.info("User retrieved", Some(json!({ "userId": user_id })));
            Ok(user)
        }
    },
);

// Then call from another function or worker
let user = iii
    .trigger(TriggerRequest {
        function_id: "users::get".into(),
        payload: json!({ "userId": "some-user-id" }),
        action: None,
        timeout_ms: None,
    })
    .await?;
println!("Retrieved user: {:?}", user);
</Tab> </Tabs>

4. Atomic updates

Use state::update with update operations for safe concurrent modifications:

<Tabs> <Tab title="Node / TypeScript"> ```typescript title="state-update.ts" iii.registerFunction({ id: 'users::record-login' }, async (input) => { const logger = new Logger()

await iii.trigger({ function_id: 'state::update', payload: { scope: 'users', key: input.userId, ops: [ { type: 'set', path: 'lastLogin', value: new Date().toISOString() }, { type: 'increment', path: 'loginCount', by: 1 }, ], }, })

logger.info('User login recorded', { userId: input.userId }) return { updated: true } })

// Then call from another function or worker await iii.trigger({ function_id: 'users::record-login', payload: { userId: 'some-user-id' }, }) logger.info('Login recorded')

</Tab>
<Tab title="Python">
```python title="state_update.py"
from datetime import datetime, timezone


def record_login(input):
    logger = Logger()

    iii.trigger({
        "function_id": "state::update",
        "payload": {
            "scope": "users",
            "key": input["userId"],
            "ops": [
                {"type": "set", "path": "lastLogin", "value": datetime.now(timezone.utc).isoformat()},
                {"type": "increment", "path": "loginCount", "by": 1},
            ],
        },
    })

    logger.info("User login recorded", {"userId": input["userId"]})
    return {"updated": True}


iii.register_function({"id": "users::record-login"}, record_login)

# Then call from another function or worker
iii.trigger({
    "function_id": "users::record-login",
    "payload": {"userId": "some-user-id"},
})
print("Login recorded")
</Tab> <Tab title="Rust"> ```rust title="state_update.rs" let iii_clone = iii.clone(); iii.register_function( RegisterFunctionMessage { id: "users::record-login".to_string(), description: None, request_format: None, response_format: None, metadata: None, invocation: None, }, move |input| { let iii = iii_clone.clone(); async move { let logger = Logger::new(); let user_id = input["userId"].as_str().unwrap_or("");
        iii.trigger(TriggerRequest {
            function_id: "state::update".into(),
            payload: json!({
                "scope": "users",
                "key": user_id,
                "ops": [
                    { "type": "set", "path": "lastLogin", "value": chrono::Utc::now().to_rfc3339() },
                    { "type": "increment", "path": "loginCount", "by": 1 },
                ],
            }),
            action: None,
            timeout_ms: None,
        }).await?;

        logger.info("User login recorded", Some(json!({ "userId": user_id })));
        Ok(json!({ "updated": true }))
    }
},

);

// Then call from another function or worker let _ = iii .trigger(TriggerRequest { function_id: "users::record-login".into(), payload: json!({ "userId": "some-user-id" }), action: None, timeout_ms: None, }) .await?; println!("Login recorded");

</Tab>
</Tabs>

## Result

State is shared across all Functions in the system. Any Function can read or write to any scope/key pair. The Engine handles consistency and persistence based on the configured adapter.
{/* <Info title="Production adapters">
  The default `file_based` adapter works for single-instance use. For production with multiple replicas, use a persistent adapter like Redis. See the [State module reference](/modules/module-state).
</Info> */}