docs/src/content/docs/connectors/doris.mdx
The doris connector provides utilities for writing rows to Apache Doris databases, with support for vector indexes (HNSW, IVF) and inverted indexes for full-text search.
from cocoindex.connectors import doris
:::note[Dependencies] This connector requires additional dependencies. Install with:
pip install cocoindex[doris]
:::
Configure the connection to your Doris cluster:
from cocoindex.connectors import doris
config = doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
fe_http_port=8080,
query_port=9030,
username="root",
password="",
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fe_host | str | (required) | Frontend host address |
database | str | (required) | Database name |
fe_http_port | int | 8080 | Frontend HTTP port (for stream load) |
query_port | int | 9030 | MySQL-compatible query port |
username | str | "root" | Username |
password | str | "" | Password |
enable_https | bool | False | Use HTTPS for stream load |
be_load_host | str | None | None | Override backend host for stream load (defaults to fe_host) |
batch_size | int | 10000 | Max rows per stream load batch |
stream_load_timeout | int | 600 | Timeout (seconds) for stream load |
replication_num | int | 1 | Replication factor for new tables |
buckets | int | str | "auto" | Bucket count for new tables |
Create a managed connection:
def connect(config: DorisConnectionConfig) -> ManagedConnection
Example:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
The doris connector provides target state APIs for writing rows to tables. CocoIndex tracks what rows should exist and automatically handles upserts and deletions via Doris stream load.
Create a ContextKey[doris.ManagedConnection] to identify your connection, then provide it in your lifespan:
:::note The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming. :::
import cocoindex as coco
from cocoindex.connectors import doris
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
# conn is cleaned up after yield
Declares a table as a target state. Returns a DorisTableTarget for declaring rows.
def declare_table_target(
db: ContextKey[ManagedConnection],
table_name: str,
table_schema: TableSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
vector_indexes: list[VectorIndexDef] | None = None,
inverted_indexes: list[InvertedIndexDef] | None = None,
) -> DorisTableTarget[RowT, coco.PendingS]
Parameters:
db — A ContextKey[doris.ManagedConnection] identifying the connection to use.table_name — Name of the table.table_schema — Schema definition including columns and primary key (see Table schema).managed_by — Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").vector_indexes — Optional list of vector index definitions (see Vector indexes).inverted_indexes — Optional list of inverted index definitions (see Inverted indexes).Returns: A pending DorisTableTarget. Use the convenience wrapper await doris.mount_table_target(...) to resolve.
Once a DorisTableTarget is resolved, declare rows to be upserted:
def DorisTableTarget.declare_row(
self,
*,
row: RowT,
) -> None
Parameters:
row — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.Define the table structure using a Python class:
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
primary_key: list[str],
*,
column_overrides: dict[str, DorisType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
Parameters:
record_type — A record type whose fields define table columns.primary_key — List of column names forming the primary key.column_overrides — Optional per-column overrides for type mapping or vector configuration.Example:
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
schema = await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
)
Python types are automatically mapped to Doris types:
| Python Type | Doris Type |
|---|---|
bool | BOOLEAN |
int | BIGINT |
float | DOUBLE |
str | STRING |
bytes | STRING (base64) |
uuid.UUID | VARCHAR(36) |
datetime.datetime | DATETIME |
datetime.date | DATE |
list, dict, nested structs | JSON |
NDArray (with vector schema) | ARRAY<FLOAT> |
Use DorisType to specify a custom Doris type:
from typing import Annotated
from cocoindex.connectors.doris import DorisType
@dataclass
class MyRow:
id: Annotated[int, DorisType("INT")]
value: Annotated[float, DorisType("FLOAT")]
Doris supports vector similarity search via HNSW and IVF indexes. Define them with VectorIndexDef:
from cocoindex.connectors.doris import VectorIndexDef
vector_idx = VectorIndexDef(
field_name="embedding",
index_type="HNSW", # or "IVF"
metric_type="l2_distance", # or "cosine_distance"
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
field_name | str | (required) | Column to index |
index_type | str | "HNSW" | Index type: "HNSW" or "IVF" |
metric_type | str | "l2_distance" | Distance metric: "l2_distance" or "cosine_distance" |
max_degree | int | None | None | HNSW max degree |
ef_construction | int | None | None | HNSW construction parameter |
nlist | int | None | None | IVF number of partitions |
Doris supports inverted indexes for full-text search. Define them with InvertedIndexDef:
from cocoindex.connectors.doris import InvertedIndexDef
inverted_idx = InvertedIndexDef(
field_name="text",
parser="unicode", # or "english", "chinese", etc.
)
Parameters:
field_name — Column to index.parser — Optional tokenizer for full-text search (e.g., "unicode", "english", "chinese"). If None, the index supports exact matching only.Build a vector similarity search SQL query:
def build_vector_search_query(
table: str,
vector_field: str,
query_vector: list[float],
metric: str = "l2_distance",
limit: int = 10,
select_columns: list[str] | None = None,
where_clause: str | None = None,
) -> str
Example:
sql = doris.build_vector_search_query(
table="doc_embeddings",
vector_field="embedding",
query_vector=query_vec.tolist(),
metric="cosine_distance",
limit=5,
)
Create an async MySQL connection for running queries:
async def connect_async(
fe_host: str,
query_port: int = 9030,
username: str = "root",
password: str = "",
database: str | None = None,
) -> Any # aiomysql connection
from typing import Annotated, Iterator
from dataclasses import dataclass
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import doris
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
@coco.fn
async def app_main() -> None:
table = await doris.mount_table_target(
DORIS_DB,
"doc_embeddings",
await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
),
vector_indexes=[
doris.VectorIndexDef(
field_name="embedding",
index_type="HNSW",
metric_type="cosine_distance",
),
],
)
# Declare rows
for doc in documents:
table.declare_row(row=doc)