docs/examples/workflow/function_calling_agent.ipynb
This notebook walks through setting up a Workflow to construct a function calling agent from scratch.
Function calling agents work by using an LLM that supports tools/functions in its API (OpenAI, Ollama, Anthropic, etc.) to call functions an use tools.
Our workflow will be stateful with memory, and will be able to call the LLM to select tools and process incoming user messages.
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
Set up tracing to visualize each step in the workflow.
Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use asyncio.run() to start an async event loop if one isn't already running.
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
An agent consists of several steps
To handle these steps, we need to define a few events:
The other steps will use the built-in StartEvent and StopEvent events.
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
With our events defined, we can construct our workflow and steps.
Note that the workflow automatically validates itself using type annotations, so the type annotations on our steps are very helpful!
from typing import Any, List
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class FuncationCallingAgent(Workflow):
def __init__(
self,
*args: Any,
llm: FunctionCallingLLM | None = None,
tools: List[BaseTool] | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
assert self.llm.metadata.is_function_calling_model
@step
async def prepare_chat_history(
self, ctx: Context, ev: StartEvent
) -> InputEvent:
# clear sources
await ctx.store.set("sources", [])
# check if memory is setup
memory = await ctx.store.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# get chat history
chat_history = memory.get()
# update context
await ctx.store.set("memory", memory)
return InputEvent(input=chat_history)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
# stream the response
response_stream = await self.llm.astream_chat_with_tools(
self.tools, chat_history=chat_history
)
async for response in response_stream:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
# save the final response, which should have all content
memory = await ctx.store.get("memory")
memory.put(response.message)
await ctx.store.set("memory", memory)
# get tool calls
tool_calls = self.llm.get_tool_calls_from_response(
response, error_on_no_tool_call=False
)
if not tool_calls:
sources = await ctx.store.get("sources", default=[])
return StopEvent(
result={"response": response, "sources": [*sources]}
)
else:
return ToolCallEvent(tool_calls=tool_calls)
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> InputEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
tool_msgs = []
sources = await ctx.store.get("sources", default=[])
# call tools -- safely!
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
additional_kwargs = {
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
}
if not tool:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"Tool {tool_call.tool_name} does not exist",
additional_kwargs=additional_kwargs,
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
sources.append(tool_output)
tool_msgs.append(
ChatMessage(
role="tool",
content=tool_output.content,
additional_kwargs=additional_kwargs,
)
)
except Exception as e:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"Encountered error in tool call: {e}",
additional_kwargs=additional_kwargs,
)
)
# update memory
memory = await ctx.store.get("memory")
for msg in tool_msgs:
memory.put(msg)
await ctx.store.set("sources", sources)
await ctx.store.set("memory", memory)
chat_history = memory.get()
return InputEvent(input=chat_history)
And thats it! Let's explore the workflow we wrote a bit.
prepare_chat_history():
This is our main entry point. It handles adding the user message to memory, and uses the memory to get the latest chat history. It returns an InputEvent.
handle_llm_input():
Triggered by an InputEvent, it uses the chat history and tools to prompt the llm. If tool calls are found, a ToolCallEvent is emitted. Otherwise, we say the workflow is done an emit a StopEvent
handle_tool_calls():
Triggered by ToolCallEvent, it calls tools with error handling and returns tool outputs. This event triggers a loop since it emits an InputEvent, which takes us back to handle_llm_input()
NOTE: With loops, we need to be mindful of runtime. Here, we set a timeout of 120s.
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
print(ret["response"])
ret = await agent.run(input="What is (2123 + 2321) * 312?")
By default, the workflow is creating a fresh Context for each run. This means that the chat history is not preserved between runs. However, we can pass our own Context to the workflow to preserve chat history.
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="Hello! My name is Logan.", ctx=ctx)
print(ret["response"])
ret = await agent.run(input="What is my name?", ctx=ctx)
print(ret["response"])
Using the handler returned from the .run() method, we can also access the streaming events.
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=False
)
handler = agent.run(input="Hello! Write me a short story about a cat.")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(event.delta, end="", flush=True)
response = await handler
# print(ret["response"])