docs/src/content/docs/connectors/turbopuffer.mdx
The turbopuffer connector provides utilities for writing rows to Turbopuffer namespaces, with support for both single and named vectors.
from cocoindex.connectors import turbopuffer
:::note[Dependencies] This connector requires additional dependencies. Install with:
pip install cocoindex[turbopuffer]
:::
Turbopuffer uses a single client object that owns the API key and region. Construct one using AsyncTurbopuffer:
from cocoindex.connectors import turbopuffer
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
turbopuffer.AsyncTurbopuffer is re-exported from the Turbopuffer Python SDK; importing it directly via from turbopuffer import AsyncTurbopuffer works too.
The turbopuffer connector provides target state APIs for writing rows to namespaces. CocoIndex tracks what rows should exist and automatically handles upserts and deletions. Turbopuffer creates namespaces implicitly on the first write, so there is no separate "create namespace" step — but the connector still tracks namespace-level configuration (vector schema and distance metric) and clears the namespace if it must be rebuilt.
Create a ContextKey[AsyncTurbopuffer] to identify your client, then provide it in your lifespan:
:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed namespaces. See ContextKey as stable identity before renaming. :::
from cocoindex.connectors import turbopuffer
import cocoindex as coco
TPUF = coco.ContextKey[turbopuffer.AsyncTurbopuffer]("my_vectors")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
builder.provide(TPUF, client)
yield
Declares a namespace as a target state. Returns a NamespaceTarget for declaring rows.
def declare_namespace_target(
db: ContextKey[AsyncTurbopuffer],
namespace_name: str,
schema: NamespaceSchema,
*,
managed_by: Literal["system", "user"] = "system",
) -> NamespaceTarget[coco.PendingS]
Parameters:
db — A ContextKey[AsyncTurbopuffer] identifying the client to use.namespace_name — Name of the namespace.schema — Schema definition specifying vector configuration and distance metric (see Namespace schema).managed_by — Whether CocoIndex manages the namespace lifecycle ("system") or assumes it exists ("user").Returns: A pending NamespaceTarget. Use the convenience wrapper await turbopuffer.mount_namespace_target(TPUF, namespace_name, schema) to resolve.
Once a NamespaceTarget is resolved, declare rows to be upserted using turbopuffer.Row:
def NamespaceTarget.declare_row(
self,
row: turbopuffer.Row,
) -> None
Row is a small dataclass:
@dataclass
class Row:
id: str | int
vector: Sequence[float] | np.ndarray | dict[str, Sequence[float] | np.ndarray]
attributes: dict[str, Any] | None = None
id — Document id (string or integer).vector — For an unnamed-vector schema, pass a single sequence. For a named-vectors schema, pass a dict mapping vector field name to its sequence.attributes — Non-vector attributes (text, tags, metadata, etc.). Turbopuffer infers attribute types from the data.Define vector configuration and distance metric for a namespace using NamespaceSchema:
class NamespaceSchema:
@classmethod
async def create(
cls,
vectors: VectorDef | dict[str, VectorDef],
*,
distance: Literal["cosine_distance", "euclidean_squared"] = "cosine_distance",
) -> NamespaceSchema
Parameters:
vectors — Either:
VectorDef for an unnamed vector (stored under turbopuffer's default "vector" field).VectorDef for named vectors.distance — Distance metric applied to all vector columns in the namespace. Turbopuffer applies a single distance metric per namespace.Specifies a vector field's dimension and dtype:
class VectorDef(NamedTuple):
schema: VectorSchemaProvider | ContextKey[VectorSchemaProvider]
The schema field accepts a VectorSchemaProvider, a ContextKey, or an explicit VectorSchema. The dtype on the VectorSchema (must be np.float32 or np.float16) controls turbopuffer's vector type — [N]f32 or [N]f16.
For namespaces with a single unnamed vector:
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
schema = await turbopuffer.NamespaceSchema.create(
vectors=turbopuffer.VectorDef(schema=embedder),
)
Rows pass the vector directly:
target.declare_row(turbopuffer.Row(
id="doc-123",
vector=embedding,
attributes={"text": "...", "tags": ["a", "b"]},
))
Namespaces can have multiple named vector columns (turbopuffer supports up to two per namespace). The name "id" is reserved for the row id and cannot be used as a vector field name.
from cocoindex.resources.schema import VectorSchema
import numpy as np
schema = await turbopuffer.NamespaceSchema.create(
vectors={
"text_embedding": turbopuffer.VectorDef(
schema=VectorSchema(dtype=np.float32, size=384),
),
"image_embedding": turbopuffer.VectorDef(
schema=VectorSchema(dtype=np.float32, size=512),
),
},
distance="cosine_distance",
)
Rows pass a dict of vectors:
target.declare_row(turbopuffer.Row(
id="doc-123",
vector={
"text_embedding": text_vec,
"image_embedding": image_vec,
},
attributes={"title": "..."},
))
Turbopuffer applies a single distance_metric per namespace. Supported values:
"cosine_distance" — Cosine distance (default)."euclidean_squared" — Squared Euclidean distance.from typing import AsyncIterator
import os
import cocoindex as coco
from cocoindex.connectors import turbopuffer
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
TPUF = coco.ContextKey[turbopuffer.AsyncTurbopuffer]("main_vectors")
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
builder.provide(TPUF, client)
yield
@coco.fn
async def process_document(
doc_id: str,
text: str,
target: turbopuffer.NamespaceTarget,
) -> None:
embedding = await embedder.embed(text)
target.declare_row(turbopuffer.Row(
id=doc_id,
vector=embedding,
attributes={"text": text},
))
@coco.fn
async def app_main() -> None:
namespace = await turbopuffer.mount_namespace_target(
TPUF,
"documents",
await turbopuffer.NamespaceSchema.create(
vectors=turbopuffer.VectorDef(schema=embedder),
),
)
for doc_id, text in documents:
await coco.mount(
coco.component_subpath("doc", doc_id),
process_document,
doc_id,
text,
namespace,
)
Turbopuffer rows are identified by str or int. UUIDs should be passed as strings.
Row attributes are schemaless; turbopuffer infers attribute types from the values you write. Supported scalar types include string, int, uint, float, bool, uuid, and datetime, plus their array variants. See Turbopuffer's schema reference for the full list.
Reserved attribute names depend on the schema; putting any reserved name in Row.attributes raises a ValueError:
id is always reserved — it's the row id.vector is also reserved (it's the wire-level vector field).The connector focuses on writing rows. For vector search, use the turbopuffer client directly:
ns = client.namespace("documents")
result = await ns.query(
rank_by=("vector", "ANN", query_embedding.tolist()),
top_k=10,
)