docs/src/content/docs/connectors/neo4j.mdx
The neo4j connector writes records to Neo4j, a property graph database. It supports node tables (labels), relationship tables (edge types), per-database multitenancy (one Neo4j cluster, many isolated databases), real Cypher uniqueness constraints, and vector indexes via the CREATE VECTOR INDEX DDL form.
from cocoindex.connectors import neo4j
:::note[Dependencies] This connector requires additional dependencies. Install with:
pip install cocoindex[neo4j]
Targets Neo4j 5.18+. Vector-index DDL (CREATE VECTOR INDEX … OPTIONS { indexConfig: { … } }) shipped in 5.18 — older 5.x servers will reject the DDL the connector emits.
:::
Create a ConnectionFactory and provide it via a ContextKey. The factory holds the Bolt URI, optional auth, and the target database name; it lazily opens a Neo4j async driver and returns a graph handle on demand.
:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming. :::
from collections.abc import AsyncIterator
from cocoindex.connectors import neo4j
import cocoindex as coco
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(
KG_DB,
neo4j.ConnectionFactory(
uri="bolt://localhost:7687",
auth=("neo4j", "cocoindex"),
database="neo4j",
),
)
yield
auth is optional — omit it for unauthenticated dev instances. database defaults to "neo4j" (the default db that ships with every Neo4j 5 installation).
A single Neo4j cluster can host many isolated databases. Pair each database with its own ContextKey and ConnectionFactory(database=...):
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("apis_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
uri = "bolt://localhost:7687"
auth = ("neo4j", "cocoindex")
builder.provide(KG_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="kg"))
builder.provide(APIS_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="apis"))
yield
Different ContextKeys with different database names produce fully separate target-state trees — changes to one never spill into the other.
The neo4j connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.
Each apply batch is wrapped in a single Neo4j transaction (tx.commit() on success, rollback on exception), so partial writes never leak into the database. Within a batch, writes are ordered as node upserts → relation upserts → relation deletes → node deletes so dependent edges always see their endpoints.
Declares a node label as a target state. Returns a TableTarget for declaring records.
def declare_table_target(
db: ContextKey,
table_name: str,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
Parameters:
db — A ContextKey[neo4j.ConnectionFactory] for the Neo4j connection.table_name — The Cypher node label (e.g. "Document").table_schema — Optional schema definition (see Table Schema). The schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree); per-property type DDL is not emitted in v1.primary_key — Single property name used as the node's primary key. Defaults to "id". Compound primary keys are not supported in v1.0.managed_by — Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").Returns: A pending TableTarget. Use await neo4j.mount_table_target(KG_DB, ...) to get a resolved target.
Once a TableTarget is resolved, declare records to be upserted (translated to MERGE (n:Label {pk: $key_0}) SET n += $props):
def TableTarget.declare_record(
self,
*,
row: RowT,
) -> None
Parameters:
row — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the primary_key field declared above.declare_row is an alias for declare_record, for compatibility with Postgres and other RDBMS targets.
Declares a relationship type as a target state. Returns a RelationTarget for declaring edges.
def declare_relation_target(
db: ContextKey,
table_name: str,
from_table: TableTarget,
to_table: TableTarget,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
Parameters:
db — A ContextKey[neo4j.ConnectionFactory] for the Neo4j connection.table_name — The Cypher relationship type (e.g. "MENTION").from_table — The TableTarget whose nodes are the source endpoints of edges in this relationship.to_table — The TableTarget whose nodes are the target endpoints of edges in this relationship.table_schema — Optional schema for the relationship's own properties. The relationship's primary_key field uniquely identifies each edge.primary_key — Single property name used as the edge's primary key. Defaults to "id".managed_by — Whether CocoIndex manages the relationship lifecycle ("system") or assumes it exists ("user").Returns: A pending RelationTarget. Use await neo4j.mount_relation_target(KG_DB, ...) to get a resolved target.
Once a RelationTarget is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.
def RelationTarget.declare_relation(
self,
*,
from_id: Any,
to_id: Any,
record: RowT | None = None,
) -> None
Parameters:
from_id — The source node's primary-key value. The connector MERGEs (s:FromLabel {pk: $from_id}) so endpoints are auto-created if absent.to_id — The target node's primary-key value. Same MERGE behavior.record — Optional row object whose fields populate the relationship's properties. Must include the relationship's primary_key field if provided.If record is omitted, the connector derives a deterministic edge id of the form {from_label}_{from_id}_{to_label}_{to_id}. Convenient when an edge has no properties of its own.
Declares a vector index on a column of a node table. Vector indexes are an attachment to a TableTarget:
def TableTarget.declare_vector_index(
self,
*,
name: str | None = None,
field: str,
metric: Literal["cosine", "euclidean"] = "cosine",
dimension: int,
) -> None
Parameters:
name — Optional logical name for the index. Defaults to f"vec_{table_name}__{field}".field — The node property holding the vector.metric — Similarity metric: "cosine" or "euclidean". Translated to Neo4j's vector.similarity_function option.dimension — The vector's dimension. Required.The connector emits:
CREATE VECTOR INDEX `coco_vec_<Label>__<field>` IF NOT EXISTS
FOR (n:`Label`) ON n.`field`
OPTIONS { indexConfig: {
`vector.dimensions`: <N>,
`vector.similarity_function`: '<metric>'
} }
Vectors are float32 only.
Build a TableSchema by introspecting a record type:
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
*,
primary_key: str = "id",
column_overrides: dict[str, Neo4jType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
Parameters:
record_type — A dataclass, NamedTuple, or Pydantic model.primary_key — Field name to use as the table's primary key. Defaults to "id".column_overrides — Optional dict mapping field names to Neo4jType or VectorSchemaProvider to override the default Python-to-Neo4j type mapping.Returns: A TableSchema[RowT] populated from the class's fields.
Most types pass through native Bolt encoding — no per-value transform applied:
| Python type | Neo4j type | Notes |
|---|---|---|
bool | BOOLEAN | |
int, NumPy integer scalars | INTEGER | |
float, NumPy float scalars | FLOAT | |
decimal.Decimal | STRING | Encoded via str() — Neo4j has no decimal type. |
str | STRING | |
bytes | BYTES | Native Bolt type — no encoder. |
uuid.UUID | STRING | Encoded via str(). |
datetime.date | DATE | Native Bolt type. |
datetime.datetime | ZONED_DATETIME | Native Bolt type. |
datetime.time | LOCAL_TIME | Native Bolt type. |
datetime.timedelta | DURATION | Native Bolt type. |
numpy.ndarray (with VectorSchema annotation) | LIST<FLOAT> | Encoded via tolist(); paired with vector-index DDL. |
dict, list, nested record, Any | MAP / LIST<ANY> | Passed through native parameter binding. |
Override the default mapping for a single column with Neo4jType:
class Neo4jType(NamedTuple):
neo4j_type: str
encoder: ValueEncoder | None = None
Use with typing.Annotated:
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.neo4j import Neo4jType
@dataclass
class Row:
id: str
score: Annotated[float, Neo4jType("STRING", encoder=str)]
The neo4j_type string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no per-property type DDL is emitted from it.
For NumPy ndarray columns, attach a VectorSchema annotation to specify dtype + dimension. See VectorSchema for details.
Build a TableSchema directly from a dict of column definitions when the row type is dynamic:
from cocoindex.connectors.neo4j import TableSchema, ColumnDef
schema = TableSchema(
columns={
"filename": ColumnDef(type="STRING"),
"title": ColumnDef(type="STRING"),
"summary": ColumnDef(type="STRING", nullable=True),
},
primary_key="filename",
)
ColumnDef fields:
type — The Neo4j type string (metadata only; see table above).nullable — Whether the column may be None. Defaults to True.encoder — Optional Callable[[Any], Any] applied to non-None values before they're sent to Neo4j.For each managed table, the connector creates supporting Cypher artifacts on first run:
CREATE CONSTRAINT `coco_uniq_<Label>__<pk>` IF NOT EXISTS
FOR (n:`<Label>`) REQUIRE n.`<pk>` IS UNIQUE
CREATE INDEX is redundant on nodes.CREATE INDEX `coco_idx_rel_<RelType>__<pk>` IF NOT EXISTS
FOR ()-[r:`<RelType>`]-() ON (r.`<pk>`)
Indexes and constraints are dropped on cocoindex drop or when the table is no longer declared.
When managed_by="user" is set, the connector skips DDL entirely — you're responsible for creating and dropping the schema. Record-level upserts and deletes still work.
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import neo4j
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
@dataclass
class Document:
filename: str
title: str
summary: str
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(KG_DB, neo4j.ConnectionFactory(
uri="bolt://localhost:7687",
auth=("neo4j", "cocoindex"),
database="neo4j",
))
yield
@coco.fn
async def app_main() -> None:
schema = await neo4j.TableSchema.from_class(Document, primary_key="filename")
documents = await neo4j.mount_table_target(
KG_DB, "Document", schema, primary_key="filename",
)
documents.declare_record(
row=Document(
filename="overview.md",
title="Overview",
summary="An overview of CocoIndex...",
)
)
app = coco.App(coco.AppConfig(name="docs_to_neo4j"), app_main)
@dataclass
class Entity:
value: str
@dataclass
class RelationshipRow:
id: str
predicate: str
@coco.fn
async def kg_app_main() -> None:
documents = await neo4j.mount_table_target(
KG_DB, "Document",
await neo4j.TableSchema.from_class(Document, primary_key="filename"),
primary_key="filename",
)
entities = await neo4j.mount_table_target(
KG_DB, "Entity",
await neo4j.TableSchema.from_class(Entity, primary_key="value"),
primary_key="value",
)
relationships = await neo4j.mount_relation_target(
KG_DB, "RELATIONSHIP",
entities, entities,
await neo4j.TableSchema.from_class(RelationshipRow, primary_key="id"),
primary_key="id",
)
# populate ...
documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
entities.declare_record(row=Entity(value="CocoIndex"))
entities.declare_record(row=Entity(value="Neo4j"))
relationships.declare_relation(
from_id="CocoIndex",
to_id="Neo4j",
record=RelationshipRow(id="rel-1", predicate="writes_to"),
)
kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)
The Entity table is declared up-front (via mount_table_target) so its uniqueness constraint is reconciled before any RELATIONSHIP edge MERGEs entity endpoints. The relationship's three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it's good practice to declare them explicitly so deletion-cascade behavior stays predictable.