Back to Copilotkit

Predictive state updates

docs/content/docs/integrations/microsoft-agent-framework/shared-state/predictive-state-updates.mdx

1.60.111.6 KB
Original Source

import { IframeSwitcher } from "@/components/content" import { TailoredContent, TailoredContentOption } from "@/components/react/tailored-content.tsx"; import { FaWrench } from "react-icons/fa";

<IframeSwitcher id="predictive-state-updates-example" exampleUrl="https://feature-viewer.copilotkit.ai/microsoft-agent-framework-dotnet/feature/predictive_state_updates?sidebar=false&chatDefaultOpen=false" codeUrl="https://feature-viewer.copilotkit.ai/microsoft-agent-framework-dotnet/feature/predictive_state_updates?view=code&sidebar=false&codeLayout=tabs" exampleLabel="Demo" codeLabel="Code" height="700px" />

<Callout type="info"> This example demonstrates predictive state updates in the CopilotKit Feature Viewer. </Callout>

What is this?

Microsoft Agent Framework agents can stream state updates through AG-UI as tool arguments are generated by the LLM. CopilotKit surfaces these updates in the UI, enabling optimistic, real-time rendering. We call these predictive state updates.

When should I use this?

Use predictive state updates when you want to:

  • Keep users engaged during long-running operations
  • Show step-by-step progress
  • Build trust by exposing what the agent is doing now, not only at the end
  • Enable agent steering (users can intervene if needed)
<Callout type="info" title="Source of truth"> When the tool completes, the agent emits a final state snapshot. Any predictive updates should be reflected in that final state or they will be overwritten. </Callout>

Implementation

<Steps> <Step> ### Define the state We will define an `observed_steps` array that is updated while the agent performs long-running tasks.
<Tabs groupId="language" items={[".NET", "Python"]}>
  <Tab value=".NET">
    ```csharp title="agent/Program.cs (excerpt)"
    using System.Text.Json.Serialization;
    public class AgentStateSnapshot
    {
        [JsonPropertyName("observed_steps")]
        public List<string> ObservedSteps { get; set; } = new();
    }
    ```
  </Tab>
  <Tab value="Python">
    ```python title="agent/src/agent.py (excerpt)"
    STATE_SCHEMA: dict[str, object] = {
        "observed_steps": {
            "type": "array",
            "items": {"type": "string"},
            "description": "Array of completed steps"
        }
    }
    ```
  </Tab>
</Tabs>
</Step> <Step> ### Emit the intermediate state (tool-based predictive updates) Configure AG-UI state management to treat tool arguments as predictive updates to `observed_steps`. As the LLM streams arguments for the tool call, AG-UI emits state delta events immediately.
<Tabs groupId="language" items={[".NET", "Python"]}>
  <Tab value=".NET">
    ```csharp title="agent/Program.cs (excerpt)"
    using System.ComponentModel;
    using System.Text.Json;
    using System.Text.Json.Serialization;
    using Azure.AI.OpenAI;
    using Azure.Identity;
    using Microsoft.Agents.AI;
    using Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;
    using Microsoft.AspNetCore.Http.Json;
    using Microsoft.Extensions.AI;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Options;

    var builder = WebApplication.CreateBuilder(args);
    builder.Services.AddAGUI();
    // Register a source-generated serializer context for fast, typed JSON
    builder.Services.ConfigureHttpJsonOptions(options =>
        options.SerializerOptions.TypeInfoResolverChain.Add(AGUIDojoServerSerializerContext.Default));

    var app = builder.Build();

    string endpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]!;
    string deployment = builder.Configuration["AZURE_OPENAI_DEPLOYMENT_NAME"]!;

    // Define a tool the LLM may call as it progresses to report partial steps
    [Description("Report current step progress.")]
    static string StepProgress([Description("Steps completed so far")] string[] steps)
        => "Progress received.";

    // Create the base agent with the reporting tool
    var baseAgent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
        .GetChatClient(deployment)
        .CreateAIAgent(
            name: "AGUIAssistant",
            instructions: "You are a helpful assistant that may call the 'step_progress' tool to report intermediate steps.",
            tools: [AIFunctionFactory.Create(StepProgress)]);

    // Wrap with a streaming middleware that emits interim state snapshots (typed, source-generated).
    // See the "Stream state from your agent" section in the Agent State guide for a full example of a DelegatingAIAgent
    // that reads streaming updates and emits DataContent with an AgentStateSnapshot.
    var jsonOptions = app.Services.GetRequiredService<IOptions<JsonOptions>>();
    AIAgent agent = new StateStreamingAgent(baseAgent, jsonOptions.Value.SerializerOptions);

    app.MapAGUI("/", agent);
    await app.RunAsync();

    // Example: streaming agent wrapper emitting state snapshots (simplified)
    internal sealed class StateStreamingAgent : DelegatingAIAgent
    {
        private readonly JsonSerializerOptions _jsonOptions;
        public StateStreamingAgent(AIAgent inner, JsonSerializerOptions jsonOptions) : base(inner)
        {
            _jsonOptions = jsonOptions;
        }

        public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
            IEnumerable<ChatMessage> messages,
            AgentThread? thread = null,
            AgentRunOptions? options = null,
            [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
        {
            var observedSteps = new List<string>();
            await foreach (var update in this.InnerAgent.RunStreamingAsync(messages, thread, options, cancellationToken))
            {
                // Inspect streaming contents for function calls and collect step arguments as they arrive
                foreach (var content in update.Contents)
                {
                    if (content is FunctionCallContent f
                        && string.Equals(f.Name, "step_progress", StringComparison.OrdinalIgnoreCase)
                        && f.Arguments is JsonElement args)
                    {
                        if (args.TryGetProperty("steps", out var stepsElement))
                        {
                            if (stepsElement.Deserialize(_jsonOptions.GetTypeInfo(typeof(string[]))) is string[] steps)
                            {
                                observedSteps.Clear();
                                foreach (var s in steps)
                                {
                                    observedSteps.Add(s);
                                }
                                // Emit a typed state snapshot into the AG‑UI stream
                                var snapshot = new AgentStateSnapshot { Steps = observedSteps };
                                byte[] stateBytes = JsonSerializer.SerializeToUtf8Bytes(
                                    snapshot,
                                    _jsonOptions.GetTypeInfo(typeof(AgentStateSnapshot)));
                                yield return new AgentRunResponseUpdate
                                {
                                    Contents = [ new DataContent(stateBytes, "application/json") ]
                                };
                            }
                        }
                    }
                }

                // Always forward the original update (text deltas / final tool results, etc.)
                yield return update;
            }
        }
    }

    // Typed state snapshot for source-generated JSON
    internal sealed class AgentStateSnapshot
    {
        [JsonPropertyName("observed_steps")]
        public List<string> Steps { get; set; } = new();
    }

    // Source-generated serializer context (register above via ConfigureHttpJsonOptions)
    [JsonSerializable(typeof(AgentStateSnapshot))]
    [JsonSerializable(typeof(string[]))]
    internal sealed partial class AGUIDojoServerSerializerContext : JsonSerializerContext;
    ```
  </Tab>
  <Tab value="Python">
    ```python title="agent/src/agent.py (excerpt)"
    from __future__ import annotations
    from typing import Annotated
    from agent_framework import Agent, SupportsChatGetResponse, tool
    from agent_framework_ag_ui import AgentFrameworkAgent
    from pydantic import Field

    # 1) Define state schema for AG-UI
    STATE_SCHEMA: dict[str, object] = {
        "observed_steps": {
            "type": "array",
            "items": {"type": "string"},
            "description": "Array of completed steps"
        }
    }

    # 2) Predictive state mapping: observed_steps <- step_progress.steps
    PREDICT_STATE_CONFIG: dict[str, dict[str, str]] = {
        "observed_steps": {
            "tool": "step_progress",
            "tool_argument": "steps",
        }
    }

    # 3) Tool that the LLM will call with step updates
    @tool
    def step_progress(
        steps: Annotated[list[str], Field(description="Steps completed so far")]
    ) -> str:
        return "Progress received."

    def create_agent(chat_client: SupportsChatGetResponse) -> AgentFrameworkAgent:
        base = Agent(
            name="sample_agent",
            instructions="You are a task performer. Report progress using step_progress.",
            client=chat_client,
            tools=[step_progress],
        )
        return AgentFrameworkAgent(
            agent=base,
            name="CopilotKitMicrosoftAgentFrameworkAgent",
            description="Agent with predictive state updates for observed steps.",
            state_schema=STATE_SCHEMA,
            predict_state_config=PREDICT_STATE_CONFIG,
            require_confirmation=False,
        )
    ```
  </Tab>
</Tabs>
<Callout>
  With this configuration, AG-UI emits predictive state updates as soon as the model streams the tool arguments, without waiting for tool completion.
</Callout>
</Step> <Step> ### Observe predictions on the client Add a state renderer to observe the predicted `observed_steps` updates as they stream in.
```tsx title="ui/app/page.tsx"
import { useAgent } from "@copilotkit/react-core/v2";

type AgentState = {
  observed_steps: string[];
};

export default function Page() {
  // Access both predicted and final states
  const { agent } = useAgent({ agentId: "sample_agent" });

  // Observe predictions (render inside the chat)
  useAgent({
    agentId: "sample_agent",
    render: ({ state }) => {
      if (!state.observed_steps?.length) return null;
      return (
        <div>
          <h3>Current Progress:</h3>
          <ul>
            {state.observed_steps.map((step, i) => (
              <li key={i}>{step}</li>
            ))}
          </ul>
        </div>
      );
    },
  });

  return <div>...</div>;
}
```
</Step> <Step> ### Give it a try! Ask the agent to perform a multi-step task (e.g., “write a short outline and report progress each step”). You’ll see `observed_steps` update in real time as the tool arguments stream in. </Step> </Steps>