Back to Graphrag

CosmosDB Vector Store Example

packages/graphrag-vectors/example_notebooks/cosmosdb.ipynb

3.0.98.0 KB
Original Source

CosmosDB Vector Store Example

This notebook demonstrates the CosmosDBVectorStore from graphrag_vectors, including:

  • Loading documents with metadata and embeddings
  • Similarity search with field selection
  • Metadata filtering using the F filter builder
  • Timestamp-based filtering on exploded date fields
  • Document update and removal

Prerequisites: Set COSMOSDB_CONNECTION_STRING in your .env file, or use the local Cosmos DB emulator.

python
import os
from pathlib import Path

import pandas as pd
from dotenv import load_dotenv
from graphrag_vectors import F, VectorStoreDocument
from graphrag_vectors.cosmosdb import CosmosDBVectorStore

load_dotenv()

# Load sample data (text units with embeddings)
data_dir = Path("data")
text_units = pd.read_parquet(data_dir / "text_units.parquet")
embeddings = pd.read_parquet(data_dir / "embeddings.text_unit_text.parquet")
text_units = text_units.merge(embeddings, on="id")

print(
    f"Loaded {len(text_units)} text units with columns: {text_units.columns.tolist()}"
)
python
# Create and connect to a CosmosDB vector store
# Local emulator connection string (Docker must be running with the emulator)
EMULATOR_CONNECTION_STRING = "AccountEndpoint=http://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
connection_string = os.environ.get(
    "COSMOSDB_CONNECTION_STRING", EMULATOR_CONNECTION_STRING
)

store = CosmosDBVectorStore(
    connection_string=connection_string,
    database_name="graphrag_vectors",
    index_name="text_units",
    fields={
        "os": "str",
        "category": "str",
        "timestamp": "date",
    },
)
store.connect()
store.create_index()

# Load documents
docs = [
    VectorStoreDocument(
        id=row["id"],
        vector=row["embedding"].tolist(),
        data=row.to_dict(),
        create_date=row.get("timestamp"),
    )
    for _, row in text_units.iterrows()
]
store.load_documents(docs)
print(f"Loaded {len(docs)} documents into store")
python
# Test count
count = store.count()
print(f"Document count: {count}")
assert count == 42, f"Expected 42, got {count}"

Use similarity_search_by_vector to find the closest documents to a query embedding. The select parameter controls which metadata fields are returned in results.

python
# Use the first document's embedding as a query vector
query_vector = text_units.iloc[0]["embedding"].tolist()

# Basic search - returns all fields
results = store.similarity_search_by_vector(query_vector, k=3)
print(f"Found {len(results)} results:")
for r in results:
    print(
        f"  - {r.document.id}: score={r.score:.4f}, data keys={list(r.document.data.keys())}"
    )

# Search with select - only return 'os' field
results = store.similarity_search_by_vector(query_vector, k=1, select=["os"])
result = results[0]
print("\nWith select=['os']:")
print(f"  Data fields: {result.document.data}")
assert "os" in result.document.data, "Expected 'os' field in data"
assert "category" not in result.document.data, "Expected 'category' to be excluded"
print("  Select parameter confirmed - only 'os' field returned.")

Metadata Filtering

Use the F filter builder to construct filter expressions with ==, !=, >, <, >=, <=. Combine with & (AND), | (OR), and ~ (NOT). Filters are compiled to CosmosDB SQL WHERE clauses.

python
# Filter by a single field
print("=== Filter: os == 'windows' ===")
filtered = store.similarity_search_by_vector(
    query_vector, k=5, filters=F.os == "windows"
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: os={r.document.data.get('os')}, score={r.score:.4f}")

# Compound filter with AND
print("\n=== Filter: os == 'windows' AND category == 'bug' ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=(F.os == "windows") & (F.category == "bug"),
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(
        f"  - {r.document.id}: os={r.document.data.get('os')}, category={r.document.data.get('category')}"
    )

# OR filter
print("\n=== Filter: category == 'bug' OR category == 'feature' ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=(F.category == "bug") | (F.category == "feature"),
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: category={r.document.data.get('category')}")

# NOT filter
print("\n=== Filter: NOT os == 'linux' ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=3,
    filters=~(F.os == "linux"),
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: os={r.document.data.get('os')}")

# Show the compiled CosmosDB filter string for debugging
filter_expr = (F.os == "windows") & (F.category == "bug")
print(f"\nCompiled CosmosDB filter: {store._compile_filter(filter_expr)}")

Timestamp Filtering

Date fields (declared as "date" in the fields dict) are automatically exploded into filterable components: _year, _month, _day, _hour, _day_of_week, _quarter.

The built-in create_date and update_date fields are also exploded automatically.

python
from datetime import datetime, timedelta

# Filter by exploded field: documents created in December
print("=== Filter: create_date_month == 12 (December) ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=F.create_date_month == 12,
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(
        f"  - {r.document.id}: create_date={r.document.create_date}, month={r.document.data.get('create_date_month')}"
    )

# Filter by day of week
print("\n=== Filter: create_date_day_of_week == 'Monday' ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=F.create_date_day_of_week == "Monday",
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: day={r.document.data.get('create_date_day_of_week')}")

# Filter by quarter
print("\n=== Filter: create_date_quarter == 4 (Q4) ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=F.create_date_quarter == 4,
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: quarter={r.document.data.get('create_date_quarter')}")

# Range query on the raw create_date
cutoff = (datetime.now() - timedelta(days=90)).isoformat()
print(f"\n=== Filter: create_date >= '{cutoff[:10]}...' (last 90 days) ===")
filtered = store.similarity_search_by_vector(
    query_vector,
    k=5,
    filters=F.create_date >= cutoff,
)
print(f"Found {len(filtered)} results:")
for r in filtered:
    print(f"  - {r.document.id}: create_date={r.document.create_date}")

# Show compiled CosmosDB filter strings
print(f"\nCompiled month filter:    {store._compile_filter(F.create_date_month == 12)}")
print(f"Compiled range filter:    {store._compile_filter(F.create_date >= cutoff)}")
print(
    f"Compiled compound filter: {store._compile_filter((F.create_date_quarter == 4) & (F.update_date_day_of_week == 'Monday'))}"
)

Document Update and Removal

Use update() to modify a document's metadata and remove() to delete documents by ID.

python
# Update a document
doc_id = text_units["id"].iloc[0]
original = store.search_by_id(doc_id)
print(f"Original os: {original.data.get('os')}")

updated_doc = VectorStoreDocument(
    id=doc_id,
    vector=None,
    data={"os": "updated-os-value"},
)
store.update(updated_doc)

result = store.search_by_id(doc_id)
print(f"Updated os: {result.data.get('os')}")
assert result.data.get("os") == "updated-os-value", "Update failed"
print("Update confirmed.")
python
# Remove documents
ids_to_delete = text_units["id"].head(5).tolist()
print(f"Deleting {len(ids_to_delete)} documents...")

store.remove(ids_to_delete)

new_count = store.count()
print(f"Document count after delete: {new_count}")
assert new_count == 37, f"Expected 37, got {new_count}"
print("Remove confirmed.")