Back to Copilotkit

AgentSubscriber

showcase/shell-docs/src/content/ag-ui/sdk/rust/client/subscriber.mdx

1.57.04.3 KB
Original Source

AgentSubscriber

The AgentSubscriber trait provides an event-driven system for handling agent lifecycle events, message updates, and state mutations during agent execution.

rust
use ag_ui_client::subscriber::AgentSubscriber;

Overview

Implement any subset of the callback methods to react to events. Handlers can optionally return an AgentStateMutation to update the in-flight messages and/or state and to stop propagation to later subscribers.

Subscriber callbacks are async-friendly via async_trait.

Adding subscribers to agents

Pass subscribers to Agent::run_agent using any of these forms via IntoSubscribers:

  • T where T: AgentSubscriber
  • (T,) single-element tuple
  • Vec<T>
  • &[T]
  • () or Option<()> when no subscribers are needed
rust
use ag_ui_client::{Agent, HttpAgent};
use ag_ui_client::agent::RunAgentParams;
use ag_ui_client::subscriber::AgentSubscriber;
use async_trait::async_trait;

struct Logger;

#[async_trait]
impl AgentSubscriber for Logger {
    async fn on_text_message_content_event(
        &self,
        event: &ag_ui_client::core::event::TextMessageContentEvent,
        _buffer: &str,
        _params: ag_ui_client::subscriber::AgentSubscriberParams<'async_trait, serde_json::Value, serde_json::Value>,
    ) -> Result<ag_ui_client::agent::AgentStateMutation, ag_ui_client::agent::AgentError> {
        println!("chunk: {}", event.content);
        Ok(Default::default())
    }
}

let params = RunAgentParams::new().user("Hello!");
let result = agent.run_agent(&params, [Logger]).await?;

AgentStateMutation

Handlers may return a mutation to modify state/messages or stop propagation:

rust
use ag_ui_client::agent::AgentStateMutation;

AgentStateMutation {
    messages: None,
    state: None,
    stop_propagation: false,
}
  • messages: Option<Vec<Message>> – replace current message history
  • state: Option<StateT> – replace current agent state
  • stop_propagation: bool – if true, subsequent subscribers won’t receive this event

AgentSubscriberParams

Common parameters passed to most subscriber methods:

rust
use ag_ui_client::subscriber::AgentSubscriberParams;
use ag_ui_client::core::types::{Message, RunAgentInput};

pub struct AgentSubscriberParams<'a, StateT, FwdPropsT> {
    pub messages: &'a [Message],
    pub state: &'a StateT,
    pub input: &'a RunAgentInput<StateT, FwdPropsT>,
}

Event callbacks

You can implement specific typed callbacks or the catch‑all on_event. Key callbacks include:

  • on_run_initialized
  • on_run_failed
  • on_run_finalized
  • on_run_started_event
  • on_run_finished_event
  • on_run_error_event
  • on_step_started_event / on_step_finished_event
  • on_text_message_start_event / on_text_message_content_event / on_text_message_end_event
  • on_tool_call_start_event / on_tool_call_args_event / on_tool_call_end_event / on_tool_call_result_event
  • on_state_snapshot_event / on_state_delta_event
  • on_messages_snapshot_event
  • on_raw_event / on_custom_event

All callbacks have sensible defaults that return Ok(Default::default()). Only implement what you need.

Example: tracking state snapshots and deltas

rust
use ag_ui_client::subscriber::{AgentSubscriber, AgentSubscriberParams};
use ag_ui_client::agent::{AgentStateMutation, AgentError};
use ag_ui_client::core::event::{StateSnapshotEvent, StateDeltaEvent};
use async_trait::async_trait;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
struct Plan { steps: Vec<String> }

struct Logger;

#[async_trait]
impl AgentSubscriber<Plan, ()> for Logger {
    async fn on_state_snapshot_event(
        &self,
        event: &StateSnapshotEvent<Plan>,
        _params: AgentSubscriberParams<'async_trait, Plan, ()>,
    ) -> Result<AgentStateMutation<Plan>, AgentError> {
        println!("snapshot: {} steps", event.snapshot.steps.len());
        Ok(Default::default())
    }

    async fn on_state_delta_event(
        &self,
        event: &StateDeltaEvent,
        _params: AgentSubscriberParams<'async_trait, Plan, ()>,
    ) -> Result<AgentStateMutation<Plan>, AgentError> {
        println!("delta patches: {}", event.delta.len());
        Ok(Default::default())
    }
}

Notes

  • Return an error from a callback to abort the run with that error after on_run_failed is invoked.
  • Use stop_propagation for short‑circuiting multi‑subscriber pipelines.
  • For simple logging, implement on_event to observe every event.