docs/src/content/docs/connectors/zvec.mdx
The zvec connector writes documents to zvec, an embedded, in-process vector database. zvec runs inside your application — no server or daemon — and stores each collection in a directory on disk.
from cocoindex.connectors import zvec
:::note[Installation] zvec is an optional dependency:
pip install cocoindex[zvec]
:::
connect() creates a ManagedConnection rooted at a base directory. Each collection lives in a subdirectory under it.
def connect(base_path: str | Path, *, enable_mmap: bool = True) -> ManagedConnection
Parameters:
base_path — Directory under which collections are stored. Created if missing.enable_mmap — Whether zvec uses memory-mapped I/O for data files.A handle to the base directory. zvec takes an exclusive write lock per open collection, so ManagedConnection caches open handles by collection name and reuses them.
Methods:
collection_path(name) — Path to a collection's directory.close() — Release all open collection handles (drops their write locks).For a lifespan, use managed_connection(), which closes handles on exit:
def managed_connection(
base_path: str | Path, *, enable_mmap: bool = True
) -> Iterator[ManagedConnection]
The zvec connector tracks which documents should exist in a collection and automatically handles upserts and deletions. zvec's native upsert is used directly, and documents are removed by id when they are no longer declared.
Create a ContextKey[zvec.ManagedConnection] to identify your connection, then provide it in your lifespan:
:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed documents. See ContextKey as stable identity before renaming. :::
import cocoindex as coco
ZVEC_DB = coco.ContextKey[zvec.ManagedConnection]("main_db")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
with zvec.managed_connection("./zvec_data") as conn:
builder.provide(ZVEC_DB, conn)
yield
Declares a collection as a target state. Returns a CollectionTarget for declaring documents.
def declare_collection_target(
db: ContextKey[ManagedConnection],
collection_name: str,
schema: CollectionSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
) -> CollectionTarget[RowT, coco.PendingS]
Parameters:
db — A ContextKey[ManagedConnection] identifying the connection.collection_name — Name of the collection (a subdirectory under the connection's base path).schema — Schema definition (see Collection schema).managed_by — Whether CocoIndex manages the collection lifecycle ("system", creating and destroying it) or assumes it already exists ("user", documents only).Returns: A pending CollectionTarget. Use await zvec.mount_collection_target(ZVEC_DB, collection_name, schema) to resolve.
Once a CollectionTarget is resolved, declare documents to be upserted:
def CollectionTarget.declare_row(self, *, row: RowT) -> None
The primary-key value becomes the document id (converted to str).
Define the collection structure using a Python class (dataclass, NamedTuple, or Pydantic model):
@classmethod
async def CollectionSchema.from_class(
cls,
record_type: type[RowT],
primary_key: list[str],
*,
column_overrides: dict[str, ZvecType | ZvecVectorDef | ZvecFtsType | VectorSchemaProvider] | None = None,
) -> CollectionSchema[RowT]
Parameters:
record_type — A record type whose fields define the document structure.primary_key — Exactly one column name. Its value becomes the document id.column_overrides — Optional per-column overrides for type mapping or vector configuration.:::note[Single primary key]
zvec documents have a single string id, so primary_key must name exactly one column. Its value is converted to str to form the id. Composite primary keys are not supported.
:::
:::note[At least one vector field] zvec is a vector database: every collection must declare at least one vector field (dense or sparse). :::
Example:
from dataclasses import dataclass
from typing import Annotated
import numpy as np
from numpy.typing import NDArray
from cocoindex.resources.schema import VectorSchema
@dataclass
class Doc:
id: str
title: str
year: int
embedding: Annotated[NDArray[np.float32], VectorSchema(dtype=np.dtype(np.float32), size=384)]
schema = await zvec.CollectionSchema.from_class(Doc, primary_key=["id"])
Scalar Python types map to zvec field types as follows:
| Python Type | zvec DataType |
|---|---|
bool | BOOL |
int | INT64 |
float | DOUBLE |
str | STRING |
bytes | STRING (base64) |
uuid.UUID | STRING |
decimal.Decimal | STRING |
datetime.date / time / datetime | STRING (ISO format) |
datetime.timedelta | DOUBLE (total seconds) |
list[str] / list[int] / list[float] / list[bool] | ARRAY_STRING / ARRAY_INT64 / ARRAY_DOUBLE / ARRAY_BOOL |
other list, dict, nested structs | STRING (JSON) |
NDArray (with vector schema) | VECTOR_FP32 (float32) or VECTOR_FP16 (float16) |
Scalar fields get an invert index by default so they can be used in query filters. The primary-key column maps to the document id and is not stored as a separate field.
Override the scalar type, encoder, or indexing for a field:
from typing import Annotated
import zvec
from cocoindex.connectors.zvec import ZvecType
@dataclass
class MyRow:
id: str
# Store as INT32 instead of INT64, without a filter index.
count: Annotated[int, ZvecType(zvec.DataType.INT32, indexed=False)]
embedding: Annotated[NDArray[np.float32], VectorSchema(dtype=np.dtype(np.float32), size=384)]
Mark a str field as full-text (FTS) indexed. The field is stored as a STRING field with a zvec FTS index, so you can run full-text match queries against it directly through zvec. Requires zvec >= 0.5.
from typing import Annotated
from cocoindex.connectors.zvec import ZvecFtsType
@dataclass
class Doc:
id: str
body: Annotated[str, ZvecFtsType(tokenizer_name="standard", filters=("lowercase",))]
embedding: Annotated[NDArray[np.float32], VectorSchema(dtype=np.dtype(np.float32), size=384)]
ZvecFtsType options: tokenizer_name (e.g. "standard", "jieba"), filters (token filters applied after tokenization, default ("lowercase",)), and extra_params. The connector writes the field and its FTS index; querying it (for example with zvec.Query(field_name=..., fts=zvec.Fts(match_string=...))) happens directly against zvec, since this connector only handles the write path.
A collection can declare multiple named vector fields, dense and sparse, in one schema. zvec supports querying across them with reranking at read time.
A NumPy ndarray field with a VectorSchema becomes a dense vector. The element dtype selects the zvec type: float32 → VECTOR_FP32, float16 → VECTOR_FP16. zvec's dense index only accepts these two; for smaller storage, keep a float32 vector and set quantize. Tune the HNSW index with ZvecVectorDef:
from cocoindex.connectors.zvec import ZvecVectorDef
@dataclass
class Doc:
id: str
embedding: Annotated[
NDArray[np.float32],
VectorSchema(dtype=np.dtype(np.float32), size=384),
ZvecVectorDef(metric="cosine", quantize="int8"),
]
ZvecVectorDef options: metric ("cosine", "ip", "l2") and quantize ("none", "fp16", "int8", "int4").
Mark a dict[int, float] field (mapping dimension → weight) as sparse with ZvecVectorDef(sparse=True):
@dataclass
class Doc:
id: str
sparse: Annotated[dict[int, float], ZvecVectorDef(sparse=True)]
import pathlib
from dataclasses import dataclass
from typing import Annotated, Iterator
import cocoindex as coco
import numpy as np
from numpy.typing import NDArray
from cocoindex.connectors import zvec
from cocoindex.resources.schema import VectorSchema
ZVEC_DB = coco.ContextKey[zvec.ManagedConnection]("main_db")
@dataclass
class Doc:
id: str
title: str
embedding: Annotated[
NDArray[np.float32], VectorSchema(dtype=np.dtype(np.float32), size=384)
]
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
with zvec.managed_connection("./zvec_data") as conn:
builder.provide(ZVEC_DB, conn)
yield
@coco.fn
async def index_docs(docs: list[Doc]) -> None:
target = await zvec.mount_collection_target(
ZVEC_DB,
"docs",
await zvec.CollectionSchema.from_class(Doc, primary_key=["id"]),
)
for doc in docs:
target.declare_row(row=doc)