Back to Cocoindex

*Snowflake* connector

docs/src/content/docs/connectors/snowflake.mdx

1.0.155.5 KB
Original Source

The snowflake connector provides target state APIs for writing rows to Snowflake tables. CocoIndex tracks the rows that should exist and applies table creation, upserts, updates, and deletes incrementally.

python
from cocoindex.connectors import snowflake

:::note[Install] Install the optional Snowflake dependency before using this connector:

bash
pip install cocoindex[snowflake]

:::

Connection setup

Create a ContextKey[snowflake.ConnectionConfig] to identify the Snowflake connection, then provide it in your lifespan:

:::note The key name is load-bearing across runs - it's the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming. :::

python
import os
from collections.abc import Iterator

import cocoindex as coco
from cocoindex.connectors import snowflake

SNOWFLAKE = coco.ContextKey[snowflake.ConnectionConfig]("snowflake")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
    builder.provide(
        SNOWFLAKE,
        snowflake.ConnectionConfig(
            account=os.environ["SNOWFLAKE_ACCOUNT"],
            user=os.environ["SNOWFLAKE_USER"],
            password=os.environ["SNOWFLAKE_PASSWORD"],
            warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE"),
            role=os.environ.get("SNOWFLAKE_ROLE"),
        ),
    )
    yield

ConnectionConfig

python
@dataclass(frozen=True)
class ConnectionConfig:
    account: str
    user: str
    password: str
    warehouse: str | None = None
    role: str | None = None

Parameters:

  • account - Snowflake account identifier.
  • user - Snowflake username.
  • password - Password for the user.
  • warehouse - Optional warehouse to use for DDL and DML.
  • role - Optional role for the session.

As target

The snowflake connector provides target state APIs for writing rows to tables.

Tables

Declares a table as a target state. Returns a TableTarget for declaring rows.

python
def declare_table_target(
    db: ContextKey[ConnectionConfig],
    table_name: str,
    table_schema: TableSchema[RowT],
    *,
    database: str | None = None,
    schema: str | None = None,
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]

Parameters:

  • db - A ContextKey[ConnectionConfig] identifying the connection to use.
  • table_name - Name of the table.
  • table_schema - Schema definition including columns and primary key.
  • database - Optional Snowflake database name.
  • schema - Optional Snowflake schema name.
  • managed_by - Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").

When managed_by="system", CocoIndex creates the database, schema, and table if needed. Table changes use Snowflake DDL, and row changes use MERGE for upserts.

Rows

Once a TableTarget is resolved, declare rows to be upserted:

python
def TableTarget.declare_row(
    self,
    *,
    row: RowT,
) -> None

Parameters:

  • row - A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.

Table schema: from Python class

Define the table structure using a Python class:

python
from dataclasses import dataclass

@dataclass
class ProductRow:
    id: str
    name: str
    price: float
    metadata: dict[str, object]

schema = await snowflake.TableSchema.from_class(
    ProductRow,
    primary_key=["id"],
)

Python types are automatically mapped to Snowflake column types:

Python TypeSnowflake Type
boolBOOLEAN
intNUMBER
floatFLOAT
decimal.DecimalNUMBER
strVARCHAR
bytesBINARY
uuid.UUIDVARCHAR
datetime.dateDATE
datetime.timeTIME
datetime.datetimeTIMESTAMP_TZ
datetime.timedeltaNUMBER
list, dict, nested structsVARIANT

VARIANT values are JSON-serialized and written with PARSE_JSON.

SnowflakeType

Use SnowflakeType to specify a custom Snowflake type and optional encoder:

python
from dataclasses import dataclass
from typing import Annotated

from cocoindex.connectors.snowflake import SnowflakeType

@dataclass
class ProductRow:
    id: Annotated[int, SnowflakeType("NUMBER(38, 0)")]
    embedding: Annotated[list[float], SnowflakeType("ARRAY")]

You can also pass column_overrides when constructing the schema:

python
schema = await snowflake.TableSchema.from_class(
    ProductRow,
    primary_key=["id"],
    column_overrides={
        "id": snowflake.SnowflakeType("NUMBER(38, 0)"),
    },
)

Example

python
from dataclasses import dataclass

@dataclass
class ProductRow:
    id: str
    name: str
    price: float
    metadata: dict[str, object]

async def declare_products(rows: list[ProductRow]) -> None:
    table = await snowflake.mount_table_target(
        SNOWFLAKE,
        table_name="product_index",
        table_schema=await snowflake.TableSchema.from_class(
            ProductRow,
            primary_key=["id"],
        ),
        database="ANALYTICS",
        schema="PUBLIC",
    )

    for row in rows:
        table.declare_row(row=row)

See examples/snowflake_target for a runnable project.

Identifier handling

Database, schema, table, and column names must be simple Snowflake identifiers containing letters, numbers, and underscores, and must not start with a number. The connector quotes identifiers when generating SQL.