Back to Copilotkit

State streaming

showcase/shell-docs/src/content/docs/integrations/deepagents/shared-state/predictive-state-updates.mdx

1.57.021.2 KB
Original Source

<IframeSwitcher id="predictive-state-updates-example" exampleUrl="https://feature-viewer.copilotkit.ai/langgraph/feature/predictive_state_updates?sidebar=false&chatDefaultOpen=false" codeUrl="https://feature-viewer.copilotkit.ai/langgraph/feature/predictive_state_updates?view=code&sidebar=false&codeLayout=tabs" exampleLabel="Demo" codeLabel="Code" height="700px" />

<Callout type="info"> This example demonstrates predictive state updates in the [CopilotKit Feature Viewer](https://feature-viewer.copilotkit.ai/langgraph/feature/predictive_state_updates). </Callout>

What is this?

A Deep Agents agent's state updates discontinuosly; only across node transitions in the graph. But even a single node in the graph often takes many seconds to run and contain sub-steps of interest to the user.

Agent-native applications reflect to the end-user what the agent is doing as continuously possible.

CopilotKit enables this through its concept of predictive state updates.

When should I use this?

You can use this when you want to provide the user with feedback about what your agent is doing, specifically to:

  • Keep users engaged by avoiding long loading indicators
  • Build trust by demonstrating what the agent is working on
  • Enable agent steering - allowing users to course-correct the agent if needed

Important Note

When a node in your LangGraph finishes executing, its returned state becomes the single source of truth. While intermediate state updates are great for real-time feedback, any changes you want to persist must be explicitly included in the node's final returned state. Otherwise, they will be overwritten when the node completes.

Implementation

<TailoredContent id="agent-type"> <TailoredContentOption id="custom-graph" title="Custom graph" description="I'm using a custom graph, where I define the nodes and edges myself." > <Steps> <Step> ### Install the CopilotKit SDK <InstallSDKSnippet/> </Step> <Step> ### Define the state We'll be defining a `observed_steps` field in the state, which will be updated as the agent writes different sections of the report.
    <Tabs groupId="agent_language" items={["Python", "TypeScript"]} persist>
        <Tab value="Python">
            ```python title="agent.py"
            from copilotkit import CopilotKitState

            class AgentState(CopilotKitState):
                observed_steps: list[str]  # Array of completed steps
            ```
        </Tab>
        <Tab value="TypeScript">
            ```ts title="agent.ts"
            import { createMiddleware } from "langchain";
            import { copilotkitMiddleware } from "@copilotkit/sdk-js/langgraph"; // [!code highlight]
            import { z } from "zod";

            export const observedStepsMiddleware = createMiddleware({
                name: "AgentState",
                stateSchema: z.object({
                    observed_steps: z.array(z.string()).default([]), // Array of completed steps
                }),
            });

            // createDeepAgent({ middleware: [observedStepsMiddleware, copilotkitMiddleware], ... })
            ```
        </Tab>
    </Tabs>
</Step>
<Step>
    ### Emit the intermediate state
    <TailoredContent
        id="state-emission"
        header={
            <div>
                <p className="text-xl font-semibold">How would you like to emit state updates?</p>
                <p className="text-base">
                    You can either manually emit state updates or configure specific tool calls to emit updates.
                </p>
            </div>
        }
    >
        <TailoredContentOption
            id="manual-emission"
            title="Manual Predictive State Updates"
            description="Manually emit state updates for maximum control over when updates occur."
            icon={<FaArrowUp />}
        >
            For long-running tasks, you can emit state updates progressively as predictions of the final state. In this example, we simulate a long-running task by executing a series of steps with a one second delay between each update.
            <Tabs groupId="agent_language" items={['Python', 'TypeScript']} persist>
                <Tab value="Python">
                    ```python title="agent.py"
                    from copilotkit.langgraph import copilotkit_emit_state # [!code highlight]
                    # ...
                    async def chat_node(state: AgentState, config: RunnableConfig) -> Command[Literal["cpk_action_node", "tool_node", "__end__"]]:
                        # ...

                        # Simulate executing steps one by one
                        steps = [
                            "Analyzing input data...",
                            "Identifying key patterns...",
                            "Generating recommendations...",
                            "Formatting final output..."
                        ]

                        for step in steps:
                            self.state["observed_steps"] = self.state.get("observed_steps", []) + [step]
                            await copilotkit_emit_state(config, state) # [!code highlight]
                            await asyncio.sleep(1)

                        # ...
                    ```
                </Tab>
                <Tab value="TypeScript">
                    ```ts title="agent.ts"
                    import { copilotkitEmitState } from "@copilotkit/sdk-js/langgraph"; // [!code highlight]

                    // Inside your custom graph node or a middleware hook that has access to state and config:
                    async function chatNode(state: any, config: any) {
                        const steps = [
                            "Analyzing input data...",
                            "Identifying key patterns...",
                            "Generating recommendations...",
                            "Formatting final output...",
                        ];

                        for (const step of steps) {
                            state.observed_steps = [...(state.observed_steps ?? []), step];
                            await copilotkitEmitState(config, state); // [!code highlight]
                            await new Promise((resolve) => setTimeout(resolve, 1000));
                        }
                    }
                    ```
                </Tab>
            </Tabs>
        </TailoredContentOption>

        <TailoredContentOption
            id="tool-emission"
            title="Tool-Based Predictive State Updates"
            description="Configure specific tool calls to automatically emit intermediate state updates."
            icon={<FaWrench />}
        >
            For long-running tasks, you can configure CopilotKit to automatically predict state updates when specific tool calls are made. In this example, we'll configure CopilotKit to predict state updates whenever the LLM calls the step progress tool.
            <Tabs groupId="agent_language" items={['Python', 'TypeScript']} persist>
                <Tab value="Python">
                    ```python
                    from copilotkit.langgraph import copilotkit_customize_config
                    from langgraph.types import Command
                    from langgraph.graph import END
                    from langchain.tools import tool
                    from langchain_openai import ChatOpenAI
                    from langchain_core.messages import SystemMessage, AIMessage
                    from langchain_core.runnables import RunnableConfig

                    # Define a step progress tool for the llm to report the steps
                    @tool
                    def step_progress_tool(steps: list[str])
                        """Reads and reports steps"""

                    async def frontend_actions_node(state: AgentState, config: RunnableConfig):
                        # Configure CopilotKit to treat step progress tool calls as predictive of the final state
                        config = copilotkit_customize_config(
                            config,
                            emit_intermediate_state=[
                                {
                                    "state_key": "observed_steps",
                                    "tool": "step_progress_tool",
                                    "tool_argument": "steps"
                                },
                            ]
                        )

                        system_message = SystemMessage(
                            content=f"You are a task performer. Pretend doing tasks you are given, report the steps using step_progress_tool."
                        )

                        # Provide the actions to the LLM
                        model = ChatOpenAI(model="gpt-4").bind_tools(
                            [
                                *state["copilotkit"]["actions"],
                                step_progress_tool
                                # your other tools here
                            ],
                        )

                        # Call the model with CopilotKit's modified config
                        response = await model.ainvoke([
                            system_message,
                            *state["messages"],
                        ], config)

                        # Set the steps in state so they are persisted and communicated to the frontend
                        if isinstance(response, AIMessage) and response.tool_calls and response.tool_calls[0].get("name") == 'step_progress_tool':
                            return Command(
                                goto=END,
                                update={
                                    "messages": response,
                                    "observed_steps": response.tool_calls[0].get("args", None).get('steps')
                                }
                            )

                        return Command(goto=END, update={"messages": response})
                    ```
                </Tab>
                <Tab value="TypeScript">
                    ```ts
                    import { copilotkitCustomizeConfig } from "@copilotkit/sdk-js/langgraph"; // [!code highlight]
                    import { tool } from "langchain";
                    import { ChatOpenAI } from "@langchain/openai";
                    import { SystemMessage } from "@langchain/core/messages";
                    import { Command } from "@langchain/langgraph";
                    import { z } from "zod";

                    const stepProgressTool = tool(
                        async (args) => args,
                        {
                            name: "step_progress_tool",
                            description: "Reports the current steps being executed",
                            schema: z.object({ steps: z.array(z.string()) }),
                        }
                    );

                    async function frontendActionsNode(state: any, config: any) {
                        // Configure CopilotKit to treat step_progress_tool calls as predictive of the final state
                        const modifiedConfig = copilotkitCustomizeConfig(config, {
                            emitIntermediateState: [
                                {
                                    stateKey: "observed_steps",
                                    tool: "step_progress_tool",
                                    toolArgument: "steps",
                                },
                            ],
                        });

                        const model = new ChatOpenAI({ model: "gpt-4" }).bindTools([
                            ...(state.copilotkit?.actions ?? []),
                            stepProgressTool,
                        ]);

                        const response = await model.invoke(
                            [
                                new SystemMessage("You are a task performer. Pretend doing tasks you are given, report the steps using step_progress_tool."),
                                ...state.messages,
                            ],
                            modifiedConfig,
                        );

                        if (response.tool_calls?.[0]?.name === "step_progress_tool") {
                            return new Command({
                                goto: "__end__",
                                update: {
                                    messages: response,
                                    observed_steps: response.tool_calls[0].args.steps,
                                },
                            });
                        }

                        return new Command({ goto: "__end__", update: { messages: response } });
                    }
                    ```
                </Tab>
            </Tabs>
        </TailoredContentOption>
    </TailoredContent>
</Step>
<Step>
    ### Observe the predictions
    These predictions will be emitted as the agent runs, allowing you to track its progress before the final state is determined.

    ```tsx title="ui/app/page.tsx"
    import { useAgent } from '@copilotkit/react-core/v2'; // [!code highlight]

    const YourMainContent = () => {
        // [!code highlight:3]
        const { agent } = useAgent({
            agentId: "sample_agent",
        });

        const observedSteps = (agent.state.observed_steps as string[]) ?? [];

        return (
            <div>
                <h1>Agent Progress</h1>
                {observedSteps.length > 0 && (
                    <div>
                        <h3>Steps:</h3>
                        <ul>
                            {observedSteps.map((step, i) => (
                                <li key={i}>{step}</li>
                            ))}
                        </ul>
                    </div>
                )}
            </div>
        )
    }
    ```
</Step>
<Step>
    ### Give it a try!
    Now you'll notice that the state predictions are emitted as the agent makes progress, giving you insight into its work before the final state is determined.
    You can apply this pattern to any long-running task in your agent.

    <video src="https://cdn.copilotkit.ai/docs/copilotkit/images/coagents/intermediate-state-render.mp4" className="rounded-lg shadow-xl" loop playsInline controls autoPlay muted />
    <Callout>
      This video shows the result of `npx copilotkit@latest init` with the [implementation](#implementation) section applied to it!
    </Callout>
</Step>
</Steps>
</TailoredContentOption>

<TailoredContentOption id="prebuilt" title="Prebuilt agent" description={I'm using a prebuilt agent like createDeepAgent or create_agent}

<Steps>
  <Step>
    ### Define the state schema
    We'll define an `observed_steps` field in the state, which will be updated as the agent reports progress.

    <Tabs groupId="agent_language" items={["Python", "TypeScript"]} persist>
      <Tab value="Python">
        ```python title="agent.py"
        from copilotkit import CopilotKitState

        class AgentState(CopilotKitState):
            observed_steps: list[str]
        ```
      </Tab>
      <Tab value="TypeScript">
        ```ts title="agent.ts"
        import { createMiddleware } from "langchain";
        import { copilotkitMiddleware } from "@copilotkit/sdk-js/langgraph"; // [!code highlight]
        import { z } from "zod";

        export const observedStepsMiddleware = createMiddleware({
            name: "AgentState",
            stateSchema: z.object({
                observed_steps: z.array(z.string()).default([]),
            }),
        });
        ```
      </Tab>
    </Tabs>
  </Step>
  <Step>
    ### Add StateStreamingMiddleware to your agent
    Instead of manually calling emit functions or configuring the `RunnableConfig`, pass `StateStreamingMiddleware` as middleware to your prebuilt agent.
    Each `StateItem` maps a tool argument to a state key — the same mapping concept as the tool-based approach in custom graphs.

    <Tabs groupId="agent_language" items={["Python", "TypeScript"]} persist>
      <Tab value="Python">
        ```python title="agent.py"
        from copilotkit import CopilotKitMiddleware, StateStreamingMiddleware, StateItem  # [!code highlight]
        from deepagents import create_deep_agent
        from langchain.tools import tool

        @tool
        def step_progress_tool(steps: list[str]):
            """Reports the current steps being executed"""

        agent = create_deep_agent(
            model="openai:gpt-5.4",
            tools=[step_progress_tool],
            middleware=[
                CopilotKitMiddleware(),
                StateStreamingMiddleware(  # [!code highlight]
                    StateItem(state_key="observed_steps", tool="step_progress_tool", tool_argument="steps")  # [!code highlight]
                ),  # [!code highlight]
            ],
            system_prompt="You are a task performer. Report your steps using step_progress_tool.",
        )
        ```
      </Tab>
      <Tab value="TypeScript">
        ```ts title="agent.ts"
        import { createDeepAgent } from "deepagents";
        import { copilotkitMiddleware } from "@copilotkit/sdk-js/langgraph";
        import { stateStreamingMiddleware, stateItem } from "@copilotkit/sdk-js/langgraph-middlewares"; // [!code highlight]
        import { tool } from "langchain";
        import { z } from "zod";

        const stepProgressTool = tool(
            async (args) => args,
            {
                name: "step_progress_tool",
                description: "Reports the current steps being executed",
                schema: z.object({ steps: z.array(z.string()) }),
            }
        );

        export const agent = createDeepAgent({
            model: "openai:gpt-5.4",
            tools: [stepProgressTool],
            middleware: [
                observedStepsMiddleware,
                copilotkitMiddleware,
                stateStreamingMiddleware(  // [!code highlight]
                    stateItem({ stateKey: "observed_steps", tool: "step_progress_tool", toolArgument: "steps" })  // [!code highlight]
                ),  // [!code highlight]
            ],
            systemPrompt: "You are a task performer. Report your steps using step_progress_tool.",
        });
        ```
      </Tab>
    </Tabs>
  </Step>
  <Step>
    ### Observe the predictions
    These predictions will be emitted as the agent runs, allowing you to track its progress before the final state is determined.

    ```tsx title="ui/app/page.tsx"
    import { useAgent } from '@copilotkit/react-core/v2'; // [!code highlight]

    const YourMainContent = () => {
        // [!code highlight:3]
        const { agent } = useAgent({
            agentId: "sample_agent",
        });

        const observedSteps = (agent.state.observed_steps as string[]) ?? [];

        return (
            <div>
                <h1>Agent Progress</h1>
                {observedSteps.length > 0 && (
                    <div>
                        <h3>Steps:</h3>
                        <ul>
                            {observedSteps.map((step, i) => (
                                <li key={i}>{step}</li>
                            ))}
                        </ul>
                    </div>
                )}
            </div>
        )
    }
    ```
  </Step>
  <Step>
    ### Give it a try!
    Now you'll notice that the state predictions are emitted as the agent makes progress, giving you insight into its work before the final state is determined.
    You can apply this pattern to any long-running task in your agent.

    <video src="https://cdn.copilotkit.ai/docs/copilotkit/images/coagents/intermediate-state-render.mp4" className="rounded-lg shadow-xl" loop playsInline controls autoPlay muted />
    <Callout>
      This video shows the result of `npx copilotkit@latest init` with the [implementation](#implementation) section applied to it!
    </Callout>
  </Step>
</Steps>
</TailoredContentOption> </TailoredContent>