docs/src/content/docs/connectors/valkey.mdx
The valkey connector provides utilities for writing documents to Valkey with vector search support, using the valkey-search module for similarity search indexes.
from cocoindex.connectors import valkey
:::note[Dependencies] This connector requires additional dependencies. Install with:
pip install cocoindex[valkey]
Requires a Valkey server with the valkey-search module loaded. :::
create_client_config() creates a configuration for connecting to Valkey.
def create_client_config(
host: str = "localhost",
port: int = 6379,
*,
password: str | None = None,
use_tls: bool = False,
client_name: str | None = "cocoindex_vector_store",
**kwargs: Any,
) -> GlideClientConfiguration
Parameters:
host — Valkey server host (default: "localhost").port — Valkey server port (default: 6379).password — Optional authentication password.use_tls — Whether to use TLS for the connection (default: False).client_name — Client name for the connection, visible in CLIENT LIST and monitoring dashboards (default: "cocoindex_vector_store"). Pass None to disable.**kwargs — Additional keyword arguments passed to GlideClientConfiguration.For advanced configurations not covered by these parameters, construct GlideClientConfiguration directly.
Returns: A GlideClientConfiguration instance.
Example:
from glide import GlideClient
config = valkey.create_client_config("localhost", 6379)
client = await GlideClient.create(config)
The valkey connector provides target state APIs for writing documents to search indexes. CocoIndex tracks what documents should exist and automatically handles upserts and deletions.
Create a ContextKey[GlideClient] to identify your Valkey client, then provide it in your lifespan:
:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed indexes. See ContextKey as stable identity before renaming. :::
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey
VALKEY_DB = coco.ContextKey[GlideClient]("my_valkey")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
config = valkey.create_client_config("localhost", 6379)
client = await GlideClient.create(config)
builder.provide(VALKEY_DB, client)
yield
await client.close()
Declares a search index as a target state. Returns an IndexTarget for declaring documents.
from cocoindex.connectorkits.target import ManagedBy
def declare_index_target(
db: ContextKey[GlideClient],
index_name: str,
schema: IndexSchema,
*,
managed_by: ManagedBy = ManagedBy.SYSTEM,
) -> IndexTarget[coco.PendingS]
Parameters:
db — A ContextKey[GlideClient] identifying the Valkey client to use.index_name — Name of the search index.schema — Schema definition specifying vector configuration (see Index Schema).managed_by — ManagedBy.SYSTEM (default): CocoIndex creates and drops the index automatically. ManagedBy.USER: assumes the index already exists and never drops it.Returns: A pending IndexTarget. Use the convenience wrapper await valkey.mount_index_target(VALKEY_DB, index_name, schema) to resolve.
Once an IndexTarget is resolved, declare documents to be upserted:
def IndexTarget.declare_document(
self,
document: valkey.Document,
) -> None
Parameters:
document — A valkey.Document containing:
id — Document ID (string)vector — Vector data (list of floats or numpy array)payload — Optional metadata dict (values must be str, int, or float)Define vector configuration for an index using IndexSchema.
class IndexSchema:
@classmethod
async def create(
cls,
vectors: VectorDef,
fields: list[FieldDef] | None = None,
) -> IndexSchema
Parameters:
vectors — A VectorDef specifying the vector field configuration.fields — Optional list of FieldDef for indexed payload fields (enables search/filtering).Specifies vector configuration including dimension, distance metric, and algorithm:
class VectorDef(NamedTuple):
schema: VectorSchemaProvider | ContextKey[VectorSchemaProvider]
distance: Literal["cosine", "l2", "ip"] = "cosine"
algorithm: Literal["hnsw", "flat"] = "hnsw"
Parameters:
schema — A VectorSchemaProvider or ContextKey that defines vector dimensions.distance — Distance metric for similarity search (default: "cosine").algorithm — Vector index algorithm (default: "hnsw").The schema field of VectorDef accepts a VectorSchemaProvider, a ContextKey, or an explicit VectorSchema to specify the vector dimension and dtype. See Vector Schema for details.
The distance parameter in VectorDef specifies the similarity metric:
"cosine" — Cosine similarity (default)"l2" — Euclidean distance (L2)"ip" — Inner product (dot product)The algorithm parameter specifies the vector indexing strategy:
"hnsw" — Hierarchical Navigable Small World graph (default, best for most use cases)"flat" — Brute-force flat index (exact results, slower for large datasets)Documents are stored as Valkey HASH keys with a prefix pattern:
{index_name}:{document_id}vector hash fieldThe search index is created with FT.CREATE ON HASH PREFIX 1 {index_name}: to automatically index all documents with the matching prefix.
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from typing import AsyncIterator
VALKEY_DB = coco.ContextKey[GlideClient]("main_vectors")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
config = valkey.create_client_config("localhost", 6379)
client = await GlideClient.create(config)
builder.provide(VALKEY_DB, client)
builder.provide(EMBEDDER, SentenceTransformerEmbedder("all-MiniLM-L6-v2"))
yield
await client.close()
@coco.fn
async def process_document(
doc_id: str,
text: str,
target: valkey.IndexTarget,
) -> None:
embedder = coco.use_context(EMBEDDER)
embedding = await embedder.embed(text)
target.declare_document(valkey.Document(
id=doc_id,
vector=embedding.tolist(),
payload={"text": text},
))
@coco.fn
async def app_main() -> None:
embedder = coco.use_context(EMBEDDER)
index = await valkey.mount_index_target(
VALKEY_DB,
"documents",
await valkey.IndexSchema.create(
vectors=valkey.VectorDef(schema=embedder, distance="cosine"),
),
)
for doc_id, text in documents:
await coco.mount(
coco.component_subpath("doc", doc_id),
process_document,
doc_id,
text,
index,
)
The connector focuses on writing documents to Valkey. For vector search queries, use the Valkey GLIDE client directly:
from glide import GlideClient
from glide.async_commands import ft
from glide.async_commands.ft import FtSearchOptions
import struct
# Build a KNN query
query_vec = struct.pack(f"<{len(embedding)}f", *embedding)
query = f"*=>[KNN 10 @vector $query_vec AS score]"
results = await ft.search(
client,
"documents",
query,
options=FtSearchOptions(params={"query_vec": query_vec}),
)