examples/langgraph-agent/agent.ipynb
The following example demonstrates building an agent using LangGraph. Graphiti is used to personalize agent responses based on information learned from prior conversations. Additionally, a database of products is loaded into the Graphiti graph, enabling the agent to speak to these products.
The agent implements:
pip install graphiti-core langchain-openai langgraph ipywidgets
Ensure that you've followed the Graphiti installation instructions. In particular, installation of neo4j.
import asyncio
import json
import logging
import os
import sys
import uuid
from contextlib import suppress
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated
import ipywidgets as widgets
from dotenv import load_dotenv
from IPython.display import Image, display
from typing_extensions import TypedDict
load_dotenv()
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
If you'd like to trace your agent using LangSmith, ensure that you have a LANGSMITH_API_KEY set in your environment.
Then set os.environ['LANGCHAIN_TRACING_V2'] = 'false' to true.
os.environ['LANGCHAIN_TRACING_V2'] = 'false'
os.environ['LANGCHAIN_PROJECT'] = 'Graphiti LangGraph Tutorial'
Ensure that you have neo4j running and a database created. Ensure that you've configured the following in your environment.
NEO4J_URI=
NEO4J_USER=
NEO4J_PASSWORD=
# Configure Graphiti
from graphiti_core import Graphiti
from graphiti_core.edges import EntityEdge
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.maintenance.graph_data_operations import clear_data
neo4j_uri = os.environ.get('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.environ.get('NEO4J_USER', 'neo4j')
neo4j_password = os.environ.get('NEO4J_PASSWORD', 'password')
client = Graphiti(
neo4j_uri,
neo4j_user,
neo4j_password,
)
The following is only required for the first run of this notebook or when you'd like to start your database over.
IMPORTANT: clear_data is destructive and will wipe your entire database.
# Note: This will clear the database
await clear_data(client.driver)
await client.build_indices_and_constraints()
Load several shoe and related products into the Graphiti. This may take a while.
IMPORTANT: This only needs to be done once. If you run clear_data you'll need to rerun this step.
async def ingest_products_data(client: Graphiti):
script_dir = Path.cwd().parent
json_file_path = script_dir / 'data' / 'manybirds_products.json'
with open(json_file_path) as file:
products = json.load(file)['products']
for i, product in enumerate(products):
await client.add_episode(
name=product.get('title', f'Product {i}'),
episode_body=str({k: v for k, v in product.items() if k != 'images'}),
source_description='ManyBirds products',
source=EpisodeType.json,
reference_time=datetime.now(timezone.utc),
)
await ingest_products_data(client)
In your own app, this step could be done later once the user has identified themselves and made their sales intent known. We do this here so we can configure the agent with the user's node_uuid.
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_EPISODE_MENTIONS
user_name = 'jess'
await client.add_episode(
name='User Creation',
episode_body=(f'{user_name} is interested in buying a pair of shoes'),
source=EpisodeType.text,
reference_time=datetime.now(timezone.utc),
source_description='SalesBot',
)
# let's get Jess's node uuid
nl = await client._search(user_name, NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
user_node_uuid = nl.nodes[0].uuid
# and the ManyBirds node uuid
nl = await client._search('ManyBirds', NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
manybirds_node_uuid = nl.nodes[0].uuid
def edges_to_facts_string(entities: list[EntityEdge]):
return '-' + '\n- '.join([edge.fact for edge in entities])
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
get_shoe_data ToolThe agent will use this to search the Graphiti graph for information about shoes. We center the search on the manybirds_node_uuid to ensure we rank shoe-related data over user data.
@tool
async def get_shoe_data(query: str) -> str:
"""Search the graphiti graph for information about shoes"""
edge_results = await client.search(
query,
center_node_uuid=manybirds_node_uuid,
num_results=10,
)
return edges_to_facts_string(edge_results)
tools = [get_shoe_data]
tool_node = ToolNode(tools)
llm = ChatOpenAI(model='gpt-4.1-mini', temperature=0).bind_tools(tools)
# Test the tool node
await tool_node.ainvoke({'messages': [await llm.ainvoke('wool shoes')]})
The chatbot uses Graphiti to provide context-aware responses in a shoe sales scenario. Here's how it works:
Context Retrieval: It searches the Graphiti graph for relevant information based on the latest message, using the user's node as the center point. This ensures that user-related facts are ranked higher than other information in the graph.
System Message: It constructs a system message incorporating facts from Graphiti, setting the context for the AI's response.
Knowledge Persistence: After generating a response, it asynchronously adds the interaction to the Graphiti graph, allowing future queries to reference this conversation.
This approach enables the chatbot to maintain context across interactions and provide personalized responses based on the user's history and preferences stored in the Graphiti graph.
class State(TypedDict):
messages: Annotated[list, add_messages]
user_name: str
user_node_uuid: str
async def chatbot(state: State):
facts_string = None
if len(state['messages']) > 0:
last_message = state['messages'][-1]
graphiti_query = f'{"SalesBot" if isinstance(last_message, AIMessage) else state["user_name"]}: {last_message.content}'
# search graphiti using Jess's node uuid as the center node
# graph edges (facts) further from the Jess node will be ranked lower
edge_results = await client.search(
graphiti_query, center_node_uuid=state['user_node_uuid'], num_results=5
)
facts_string = edges_to_facts_string(edge_results)
system_message = SystemMessage(
content=f"""You are a skillfull shoe salesperson working for ManyBirds. Review information about the user and their prior conversation below and respond accordingly.
Keep responses short and concise. And remember, always be selling (and helpful!)
Things you'll need to know about the user in order to close a sale:
- the user's shoe size
- any other shoe needs? maybe for wide feet?
- the user's preferred colors and styles
- their budget
Ensure that you ask the user for the above if you don't already know.
Facts about the user and their conversation:
{facts_string or 'No facts about the user and their conversation'}"""
)
messages = [system_message] + state['messages']
response = await llm.ainvoke(messages)
# add the response to the graphiti graph.
# this will allow us to use the graphiti search later in the conversation
# we're doing async here to avoid blocking the graph execution
asyncio.create_task(
client.add_episode(
name='Chatbot Response',
episode_body=f'{state["user_name"]}: {state["messages"][-1]}\nSalesBot: {response.content}',
source=EpisodeType.message,
reference_time=datetime.now(timezone.utc),
source_description='Chatbot',
)
)
return {'messages': [response]}
This section sets up the Agent's LangGraph graph:
Graph Structure: It defines a graph with nodes for the agent (chatbot) and tools, connected in a loop.
Conditional Logic: The should_continue function determines whether to end the graph execution or continue to the tools node based on the presence of tool calls.
Memory Management: It uses a MemorySaver to maintain conversation state across turns. This is in addition to using Graphiti for facts.
graph_builder = StateGraph(State)
memory = MemorySaver()
# Define the function that determines whether to continue or not
async def should_continue(state, config):
messages = state['messages']
last_message = messages[-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return 'end'
# Otherwise if there is, we continue
else:
return 'continue'
graph_builder.add_node('agent', chatbot)
graph_builder.add_node('tools', tool_node)
graph_builder.add_edge(START, 'agent')
graph_builder.add_conditional_edges('agent', should_continue, {'continue': 'tools', 'end': END})
graph_builder.add_edge('tools', 'agent')
graph = graph_builder.compile(checkpointer=memory)
Our LangGraph agent graph is illustrated below.
with suppress(Exception):
display(Image(graph.get_graph().draw_mermaid_png()))
Let's test the agent with a single call
await graph.ainvoke(
{
'messages': [
{
'role': 'user',
'content': 'What sizes do the TinyBirds Wool Runners in Natural Black come in?',
}
],
'user_name': user_name,
'user_node_uuid': user_node_uuid,
},
config={'configurable': {'thread_id': uuid.uuid4().hex}},
)
At this stage, the graph would look something like this. The jess node is INTERESTED_IN the TinyBirds Wool Runner node. The image below was generated using Neo4j Desktop.
display(Image(filename='tinybirds-jess.png', width=850))
The following code will run the agent in an event loop. Just enter a message into the box and click submit.
conversation_output = widgets.Output()
config = {'configurable': {'thread_id': uuid.uuid4().hex}}
user_state = {'user_name': user_name, 'user_node_uuid': user_node_uuid}
async def process_input(user_state: State, user_input: str):
conversation_output.append_stdout(f'\nUser: {user_input}\n')
conversation_output.append_stdout('\nAssistant: ')
graph_state = {
'messages': [{'role': 'user', 'content': user_input}],
'user_name': user_state['user_name'],
'user_node_uuid': user_state['user_node_uuid'],
}
try:
async for event in graph.astream(
graph_state,
config=config,
):
for value in event.values():
if 'messages' in value:
last_message = value['messages'][-1]
if isinstance(last_message, AIMessage) and isinstance(
last_message.content, str
):
conversation_output.append_stdout(last_message.content)
except Exception as e:
conversation_output.append_stdout(f'Error: {e}')
def on_submit(b):
user_input = input_box.value
input_box.value = ''
asyncio.create_task(process_input(user_state, user_input))
input_box = widgets.Text(placeholder='Type your message here...')
submit_button = widgets.Button(description='Send')
submit_button.on_click(on_submit)
conversation_output.append_stdout('Assistant: Hello, how can I help you find shoes today?')
display(widgets.VBox([input_box, submit_button, conversation_output]))