Back to Cocoindex

*Valkey* connector

docs/src/content/docs/connectors/valkey.mdx

1.0.138.2 KB
Original Source

The valkey connector provides utilities for writing documents to Valkey with vector search support, using the valkey-search module for similarity search indexes.

python
from cocoindex.connectors import valkey

:::note[Dependencies] This connector requires additional dependencies. Install with:

bash
pip install cocoindex[valkey]

Requires a Valkey server with the valkey-search module loaded. :::

Connection setup

create_client_config() creates a configuration for connecting to Valkey.

python
def create_client_config(
    host: str = "localhost",
    port: int = 6379,
    *,
    password: str | None = None,
    use_tls: bool = False,
    client_name: str | None = "cocoindex_vector_store",
    **kwargs: Any,
) -> GlideClientConfiguration

Parameters:

  • host — Valkey server host (default: "localhost").
  • port — Valkey server port (default: 6379).
  • password — Optional authentication password.
  • use_tls — Whether to use TLS for the connection (default: False).
  • client_name — Client name for the connection, visible in CLIENT LIST and monitoring dashboards (default: "cocoindex_vector_store"). Pass None to disable.
  • **kwargs — Additional keyword arguments passed to GlideClientConfiguration.

For advanced configurations not covered by these parameters, construct GlideClientConfiguration directly.

Returns: A GlideClientConfiguration instance.

Example:

python
from glide import GlideClient
config = valkey.create_client_config("localhost", 6379)
client = await GlideClient.create(config)

As target

The valkey connector provides target state APIs for writing documents to search indexes. CocoIndex tracks what documents should exist and automatically handles upserts and deletions.

Declaring target states

Setting up a connection

Create a ContextKey[GlideClient] to identify your Valkey client, then provide it in your lifespan:

:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed indexes. See ContextKey as stable identity before renaming. :::

python
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey

VALKEY_DB = coco.ContextKey[GlideClient]("my_valkey")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    config = valkey.create_client_config("localhost", 6379)
    client = await GlideClient.create(config)
    builder.provide(VALKEY_DB, client)
    yield
    await client.close()

Indexes (parent state)

Declares a search index as a target state. Returns an IndexTarget for declaring documents.

python
from cocoindex.connectorkits.target import ManagedBy

def declare_index_target(
    db: ContextKey[GlideClient],
    index_name: str,
    schema: IndexSchema,
    *,
    managed_by: ManagedBy = ManagedBy.SYSTEM,
) -> IndexTarget[coco.PendingS]

Parameters:

  • db — A ContextKey[GlideClient] identifying the Valkey client to use.
  • index_name — Name of the search index.
  • schema — Schema definition specifying vector configuration (see Index Schema).
  • managed_byManagedBy.SYSTEM (default): CocoIndex creates and drops the index automatically. ManagedBy.USER: assumes the index already exists and never drops it.

Returns: A pending IndexTarget. Use the convenience wrapper await valkey.mount_index_target(VALKEY_DB, index_name, schema) to resolve.

Documents (child states)

Once an IndexTarget is resolved, declare documents to be upserted:

python
def IndexTarget.declare_document(
    self,
    document: valkey.Document,
) -> None

Parameters:

  • document — A valkey.Document containing:
    • id — Document ID (string)
    • vector — Vector data (list of floats or numpy array)
    • payload — Optional metadata dict (values must be str, int, or float)

Index schema

Define vector configuration for an index using IndexSchema.

python
class IndexSchema:
    @classmethod
    async def create(
        cls,
        vectors: VectorDef,
        fields: list[FieldDef] | None = None,
    ) -> IndexSchema

Parameters:

  • vectors — A VectorDef specifying the vector field configuration.
  • fields — Optional list of FieldDef for indexed payload fields (enables search/filtering).

VectorDef

Specifies vector configuration including dimension, distance metric, and algorithm:

python
class VectorDef(NamedTuple):
    schema: VectorSchemaProvider | ContextKey[VectorSchemaProvider]
    distance: Literal["cosine", "l2", "ip"] = "cosine"
    algorithm: Literal["hnsw", "flat"] = "hnsw"

Parameters:

  • schema — A VectorSchemaProvider or ContextKey that defines vector dimensions.
  • distance — Distance metric for similarity search (default: "cosine").
  • algorithm — Vector index algorithm (default: "hnsw").

VectorSchemaProvider

The schema field of VectorDef accepts a VectorSchemaProvider, a ContextKey, or an explicit VectorSchema to specify the vector dimension and dtype. See Vector Schema for details.

Distance metrics

The distance parameter in VectorDef specifies the similarity metric:

  • "cosine" — Cosine similarity (default)
  • "l2" — Euclidean distance (L2)
  • "ip" — Inner product (dot product)

Index algorithms

The algorithm parameter specifies the vector indexing strategy:

  • "hnsw" — Hierarchical Navigable Small World graph (default, best for most use cases)
  • "flat" — Brute-force flat index (exact results, slower for large datasets)

Data storage

Documents are stored as Valkey HASH keys with a prefix pattern:

  • Key format: {index_name}:{document_id}
  • Vector field: stored as binary float32 blob in a vector hash field
  • Payload fields: stored as individual hash fields (string values)

The search index is created with FT.CREATE ON HASH PREFIX 1 {index_name}: to automatically index all documents with the matching prefix.

Example

python
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from typing import AsyncIterator

VALKEY_DB = coco.ContextKey[GlideClient]("main_vectors")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    config = valkey.create_client_config("localhost", 6379)
    client = await GlideClient.create(config)
    builder.provide(VALKEY_DB, client)
    builder.provide(EMBEDDER, SentenceTransformerEmbedder("all-MiniLM-L6-v2"))
    yield
    await client.close()

@coco.fn
async def process_document(
    doc_id: str,
    text: str,
    target: valkey.IndexTarget,
) -> None:
    embedder = coco.use_context(EMBEDDER)
    embedding = await embedder.embed(text)

    target.declare_document(valkey.Document(
        id=doc_id,
        vector=embedding.tolist(),
        payload={"text": text},
    ))

@coco.fn
async def app_main() -> None:
    embedder = coco.use_context(EMBEDDER)

    index = await valkey.mount_index_target(
        VALKEY_DB,
        "documents",
        await valkey.IndexSchema.create(
            vectors=valkey.VectorDef(schema=embedder, distance="cosine"),
        ),
    )

    for doc_id, text in documents:
        await coco.mount(
            coco.component_subpath("doc", doc_id),
            process_document,
            doc_id,
            text,
            index,
        )

The connector focuses on writing documents to Valkey. For vector search queries, use the Valkey GLIDE client directly:

python
from glide import GlideClient
from glide.async_commands import ft
from glide.async_commands.ft import FtSearchOptions
import struct

# Build a KNN query
query_vec = struct.pack(f"<{len(embedding)}f", *embedding)
query = f"*=>[KNN 10 @vector $query_vec AS score]"

results = await ft.search(
    client,
    "documents",
    query,
    options=FtSearchOptions(params={"query_vec": query_vec}),
)