docs/examples/cookbooks/build_knowledge_graph_with_neo4j_llamacloud.ipynb
RAG is as a powerful technique for enhancing LLMs with external knowledge, but traditional semantic similarity search often fails to capture nuanced relationships between entities, and can miss critical context that spans across multiple documents. By transforming unstructured documents into structured knowledge representations, we can perform complex graph traversals, relationship queries, and contextual reasoning that goes far beyond simple similarity matching.
Tools like LlamaParse, LlamaClassify and LlamaExtract provide robust parsing, classification and extraction capabilities to convert raw documents into structured data, while Neo4j serves as the backbone for knowledge graph representation, forming the foundation of GraphRAG architectures that can understand not just what information exists, but how it all connects together.
In this end-to-end tutorial, we will walk through an example of legal document processing that showcases the full pipeline shown below.
The pipeline contains the following steps:
!pip install llama-index-workflows llama-cloud-services jsonschema openai neo4j llama-index-llms-openai
import re
import uuid
from datetime import date
from typing import List, Optional
from getpass import getpass
from neo4j import AsyncGraphDatabase
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from llama_cloud_services.beta.classifier.client import ClassifyClient
from llama_cloud.types import ClassifierRule
from llama_cloud.client import AsyncLlamaCloud
from llama_cloud_services.extract import (
ExtractConfig,
ExtractMode,
LlamaExtract,
SourceText,
)
from llama_cloud_services.parse import LlamaParse
from llama_index.llms.openai import OpenAI
Here, we download a sample PDF from the Cuad dataset
!wget https://raw.githubusercontent.com/tomasonjo/blog-datasets/5e3939d10216b7663687217c1646c30eb921d92f/CybergyHoldingsInc_Affliate%20Agreement.pdf
For Neo4j, the simplest approach is to create a free Aura database instance, and copy your credentials here.
db_url = "your-db-url"
username = "neo4j"
password = "your-password"
neo4j_driver = AsyncGraphDatabase.driver(
db_url,
auth=(
username,
password,
),
)
Next, we set up LlamaParse and parse the PDF. In this case, we're using parse_page_without_llm mode.
import os
os.environ["LLAMA_CLOUD_API_KEY"] = getpass("Enter your Llama API key: ")
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
# Initialize parser with specified mode
parser = LlamaParse(parse_mode="parse_page_without_llm")
# Define the PDF file to parse
pdf_path = "CybergyHoldingsInc_Affliate Agreement.pdf"
# Parse the document asynchronously
results = await parser.aparse(pdf_path)
print(results.pages[0].text)
In this example, we want to classify incoming contracts. They can either be Affiliate Agreements or Co Branding. For this cookbook, we are going to use LlamaClassify, our powerful AI-powered document classification tool that's currently in beta!
rules = [
ClassifierRule(
type="Affiliate Agreements",
description="This is a contract that outlines an affiliate agreement",
),
ClassifierRule(
type="Co Branding",
description="This is a contract that outlines a co-branding deal/agreement",
),
]
classifier = ClassifyClient(
client=AsyncLlamaCloud(
base_url="https://api.cloud.llamaindex.ai",
token=os.environ["LLAMA_CLOUD_API_KEY"],
)
)
result = await classifier.aclassify_file_path(
rules=rules,
file_input_path="/content/CybergyHoldingsInc_Affliate Agreement.pdf",
)
classification = result.items[0].result
print("Classification Result: " + classification.type)
print("Classification Reason: " + classification.reasoning)
Next, we define some schemas which we can use to extract relevant information from our contracts with. The fields we define are a mix of summarization and structured data extraction.
Here we define two Pydantic models: Location captures structured address information with optional fields for country, state, and address, while Party represents contract parties with a required name and optional location details. The Field descriptions help guide the extraction process by telling the LLM exactly what information to look for in each field.
class Location(BaseModel):
"""Location information with structured address components."""
country: Optional[str] = Field(None, description="Country")
state: Optional[str] = Field(None, description="State or province")
address: Optional[str] = Field(None, description="Street address or city")
class Party(BaseModel):
"""Party information with name and location."""
name: str = Field(description="Party name")
location: Optional[Location] = Field(
None, description="Party location details"
)
Remember we have multiple contract types, so we need to define specific extraction schemas for each type and create a mapping system to dynamically select the appropriate schema based on our classification result.
class BaseContract(BaseModel):
"""Base contract class with common fields."""
parties: Optional[List[Party]] = Field(
None, description="All contracting parties"
)
agreement_date: Optional[str] = Field(
None, description="Contract signing date. Use YYYY-MM-DD"
)
effective_date: Optional[str] = Field(
None, description="When contract becomes effective. Use YYYY-MM-DD"
)
expiration_date: Optional[str] = Field(
None, description="Contract expiration date. Use YYYY-MM-DD"
)
governing_law: Optional[str] = Field(
None, description="Governing jurisdiction"
)
termination_for_convenience: Optional[bool] = Field(
None, description="Can terminate without cause"
)
anti_assignment: Optional[bool] = Field(
None, description="Restricts assignment to third parties"
)
cap_on_liability: Optional[str] = Field(
None, description="Liability limit amount"
)
class AffiliateAgreement(BaseContract):
"""Affiliate Agreement extraction."""
exclusivity: Optional[str] = Field(
None, description="Exclusive territory or market rights"
)
non_compete: Optional[str] = Field(
None, description="Non-compete restrictions"
)
revenue_profit_sharing: Optional[str] = Field(
None, description="Commission or revenue split"
)
minimum_commitment: Optional[str] = Field(
None, description="Minimum sales targets"
)
class CoBrandingAgreement(BaseContract):
"""Co-Branding Agreement extraction."""
exclusivity: Optional[str] = Field(
None, description="Exclusive co-branding rights"
)
ip_ownership_assignment: Optional[str] = Field(
None, description="IP ownership allocation"
)
license_grant: Optional[str] = Field(
None, description="Brand/trademark licenses"
)
revenue_profit_sharing: Optional[str] = Field(
None, description="Revenue sharing terms"
)
mapping = {
"affiliate_agreements": AffiliateAgreement,
"co_branding": CoBrandingAgreement,
}
extractor = LlamaExtract()
agent = extractor.create_agent(
name=f"extraction_workflow_import_{uuid.uuid4()}",
data_schema=mapping[classification.type],
config=ExtractConfig(
extraction_mode=ExtractMode.BALANCED,
),
)
result = await agent.aextract(
files=SourceText(
text_content=" ".join([el.text for el in results.pages]),
filename=pdf_path,
),
)
result.data
The final step is to take our extracted structured information and build a knowledge graph that represents the relationships between contract entities. We need to define a graph model that specifies how our contract data should be organized as nodes and relationships in Neo4j.
Our graph model consists of three main node types:
Now we'll import our extracted contract data into Neo4j according to our defined graph model.
import_query = """
WITH $contract AS contract
MERGE (c:Contract {path: $path})
SET c += apoc.map.clean(contract, ["parties", "agreement_date", "effective_date", "expiration_date"], [])
// Cast to date
SET c.agreement_date = date(contract.agreement_date),
c.effective_date = date(contract.effective_date),
c.expiration_date = date(contract.expiration_date)
// Create parties with their locations
WITH c, contract
UNWIND coalesce(contract.parties, []) AS party
MERGE (p:Party {name: party.name})
MERGE (c)-[:HAS_PARTY]->(p)
// Create location nodes and link to parties
WITH p, party
WHERE party.location IS NOT NULL
MERGE (p)-[:HAS_LOCATION]->(l:Location)
SET l += party.location
"""
response = await neo4j_driver.execute_query(
import_query, contract=result.data, path=pdf_path
)
response.summary.counters
Finally, we can combine all of this logic into one single executable agentic workflow. Let's make it so that the workflow can run by accepting a single PDF, adding new entries to our Neo4j graph each time.
affiliate_extraction_agent = extractor.create_agent(
name="Affiliate_Extraction",
data_schema=AffiliateAgreement,
config=ExtractConfig(
extraction_mode=ExtractMode.BALANCED,
),
)
cobranding_extraction_agent = extractor.create_agent(
name="CoBranding_Extraction",
data_schema=CoBrandingAgreement,
config=ExtractConfig(
extraction_mode=ExtractMode.BALANCED,
),
)
from llama_index.core.workflow import (
Workflow,
Event,
Context,
StartEvent,
StopEvent,
step,
)
class ClassifyDocEvent(Event):
parsed_doc: str
pdf_path: str
class ExtractAffiliate(Event):
file_path: str
class ExtractCoBranding(Event):
file_path: str
class BuildGraph(Event):
file_path: str
data: dict
class KnowledgeGraphBuilder(Workflow):
def __init__(
self,
classifier: ClassifyClient,
rules: List[ClassifierRule],
affiliate_extract_agent: LlamaExtract,
cobranding_extract_agent: LlamaExtract,
**kwargs,
):
super().__init__(**kwargs)
self.classifier = classifier
self.rules = rules
self.affiliate_extract_agent = affiliate_extract_agent
self.cobranding_extract_agent = cobranding_extract_agent
@step
async def classify_contract(
self, ctx: Context, ev: StartEvent
) -> ExtractAffiliate | ExtractCoBranding | StopEvent:
result = await self.classifier.aclassify_file_path(
rules=self.rules, file_input_path=ev.pdf_path
)
contract_type = result.items[0].result.type
print(contract_type)
if contract_type == "affiliate_agreements":
return ExtractAffiliate(file_path=ev.pdf_path)
elif contract_type == "co_branding":
return ExtractCoBranding(file_path=ev.pdf_path)
else:
return StopEvent()
@step
async def extract_affiliate(
self, ctx: Context, ev: ExtractAffiliate
) -> BuildGraph:
result = await self.affiliate_extract_agent.aextract(ev.file_path)
return BuildGraph(data=result.data, file_path=ev.file_path)
@step
async def extract_co_branding(
self, ctx: Context, ev: ExtractCoBranding
) -> BuildGraph:
result = await self.cobranding_extract_agent.aextract(ev.file_path)
return BuildGraph(data=result.data, file_path=ev.file_path)
@step
async def build_graph(self, ctx: Context, ev: BuildGraph) -> StopEvent:
import_query = """
WITH $contract AS contract
MERGE (c:Contract {path: $path})
SET c += apoc.map.clean(contract, ["parties", "agreement_date", "effective_date", "expiration_date"], [])
// Cast to date
SET c.agreement_date = date(contract.agreement_date),
c.effective_date = date(contract.effective_date),
c.expiration_date = date(contract.expiration_date)
// Create parties with their locations
WITH c, contract
UNWIND coalesce(contract.parties, []) AS party
MERGE (p:Party {name: party.name})
MERGE (c)-[:HAS_PARTY]->(p)
// Create location nodes and link to parties
WITH p, party
WHERE party.location IS NOT NULL
MERGE (p)-[:HAS_LOCATION]->(l:Location)
SET l += party.location
"""
response = await neo4j_driver.execute_query(
import_query, contract=ev.data, path=ev.file_path
)
return StopEvent(response.summary.counters)
knowledge_graph_builder = KnowledgeGraphBuilder(
classifier=classifier,
rules=rules,
affiliate_extract_agent=affiliate_extraction_agent,
cobranding_extract_agent=cobranding_extraction_agent,
timeout=None,
verbose=True,
)
response = await knowledge_graph_builder.run(
pdf_path="CybergyHoldingsInc_Affliate Agreement.pdf"
)