Back to Prefect

database

docs/integrations/prefect-sqlalchemy/api-ref/prefect_sqlalchemy-database.mdx

3.6.30.dev328.7 KB
Original Source

prefect_sqlalchemy.database

Tasks for querying a database with SQLAlchemy

Functions

check_make_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L23" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
check_make_url(url: str) -> str

Classes

SqlAlchemyConnector <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L34" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Block used to manage authentication with a database using synchronous drivers.

Upon instantiating, an engine is created and maintained for the life of the object until the close method is called.

It is recommended to use this block as a context manager, which will automatically close the engine and its connections when the context is exited.

It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block's connection and cursor could be lost.

Attributes:

  • connection_info: SQLAlchemy URL to create the engine; either create from components or create from a string.
  • connect_args: The options which will be passed directly to the DBAPI's connect() method as additional keyword arguments.
  • fetch_size: The number of rows to fetch at a time.

Methods:

block_initialization <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L122" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
block_initialization(self) -> None

Initializes the engine.

close <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L633" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
close(self) -> None

Closes sync connections and its cursors.

execute <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L527" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult

Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.

Unlike the fetch methods, this method will always execute the operation upon calling.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Examples:

Create a table and insert one row into it.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        parameters={"name": "Marvin", "address": "Highway 42"},
    )

execute_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L571" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult

Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.

Unlike the fetch methods, this method will always execute the operation upon calling.

Args:

  • operation: The SQL query or other operation to be executed.
  • seq_of_parameters: The sequence of parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Examples:

Create a table and insert two rows into it.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )

fetch_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L474" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]

Fetch all results from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch all where name is 'Me'.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = database.fetch_all(
        "SELECT * FROM customers WHERE name = :name",
        parameters={"name": "Me"}
    )

fetch_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L417" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]

Fetch a limited number of results from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • size: The number of results to return; if None or 0, uses the value of fetch_size configured on the block.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch two rows repeatedly.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = database.fetch_many("SELECT * FROM customers", size=2)
    print(results)
    results = database.fetch_many("SELECT * FROM customers", size=2)
    print(results)

fetch_one <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L364" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]

Fetch a single result from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A tuple containing the data returned by the database, where each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch a row repeatedly.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = True
    while results:
        results = database.fetch_one("SELECT * FROM customers")
        print(results)

get_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[Engine, Connection]

Returns either an engine or connection that can be used to query from databases.

Args:

  • client_type: Select from either 'engine' or 'connection'.
  • **get_client_kwargs: Additional keyword arguments to pass to either get_engine or get_connection.

Returns:

  • The authenticated SQLAlchemy engine or connection.

Examples:

Create an engine.

python
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")

Create a context managed connection.

python
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_client(client_type="connection") as conn:
    ...

get_connection <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L201" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> Connection

Returns a connection that can be used to query from databases.

Args:

  • begin: Whether to begin a transaction on the connection; if True, if any operations fail, the entire transaction will be rolled back.
  • **connect_kwargs: Additional keyword arguments to pass to either engine.begin or engine.connect`.

Returns:

  • The SQLAlchemy Connection.

Examples:

Create a synchronous connection as a context-managed transaction.

python
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_connection(begin=False) as connection:
    connection.execute("SELECT * FROM table LIMIT 1;")

get_engine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L151" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> Engine

Returns an authenticated engine that can be used to query from databases.

If an existing engine exists, return that one.

Returns:

  • The authenticated SQLAlchemy Engine.

Examples:

Create a synchronous engine to PostgreSQL using URL params.

python
from prefect import flow
from prefect_sqlalchemy import (
    SqlAlchemyConnector, ConnectionComponents, SyncDriver
)

@flow
def sqlalchemy_credentials_flow():
    sqlalchemy_credentials = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
            driver=SyncDriver.POSTGRESQL_PSYCOPG2,
            username="prefect",
            password="prefect_password",
            database="postgres"
        )
    )
    print(sqlalchemy_credentials.get_engine())

sqlalchemy_credentials_flow()

reset_connections <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L341" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reset_connections(self) -> None

Tries to close all opened connections and their results.

Examples:

Resets connections so fetch_* methods return new results.

python
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    results = database.fetch_one("SELECT * FROM customers")
    database.reset_connections()
    results = database.fetch_one("SELECT * FROM customers")

AsyncSqlAlchemyConnector <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L662" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Block used to manage authentication with a database using asynchronous drivers.

Upon instantiating, an engine is created and maintained for the life of the object until the close method is called.

It is recommended to use this block as an async context manager, which will automatically close the engine and its connections when the context is exited.

It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block's connection and cursor could be lost.

Attributes:

  • connection_info: SQLAlchemy URL to create the engine; either create from components or create from a string.
  • connect_args: The options which will be passed directly to the DBAPI's connect() method as additional keyword arguments.
  • fetch_size: The number of rows to fetch at a time.

Methods:

block_initialization <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L758" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
block_initialization(self) -> None

Initializes the engine.

close <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1296" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
close(self) -> None

Closes async connections and its cursors.

execute <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1182" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult

Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.

Unlike the fetch methods, this method will always execute the operation upon calling.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Examples:

Create a table and insert one row into it.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )

asyncio.run(example_run())

execute_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1230" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult

Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.

Unlike the fetch methods, this method will always execute the operation upon calling.

Args:

  • operation: The SQL query or other operation to be executed.
  • seq_of_parameters: The sequence of parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Examples:

Create a table and insert two rows into it.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

asyncio.run(example_run())

fetch_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]

Fetch all results from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch all where name is 'Me'.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = await database.fetch_all(
            "SELECT * FROM customers WHERE name = :name",
            parameters={"name": "Me"}
        )

asyncio.run(example_run())

fetch_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1064" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]

Fetch a limited number of results from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • size: The number of results to return; if None or 0, uses the value of fetch_size configured on the block.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch two rows repeatedly.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = await database.fetch_many("SELECT * FROM customers", size=2)
        print(results)
        results = await database.fetch_many("SELECT * FROM customers", size=2)
        print(results)

asyncio.run(example_run())

fetch_one <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1007" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]

Fetch a single result from the database.

Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.

Args:

  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.

Returns:

  • A tuple containing the data returned by the database, where each column is a value in the tuple.

Examples:

Create a table, insert three rows into it, and fetch a row repeatedly.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = True
        while results:
            results = await database.fetch_one("SELECT * FROM customers")
            print(results)

asyncio.run(example_run())

get_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L871" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[AsyncEngine, AsyncConnection]

Returns either an engine or connection that can be used to query from databases.

Args:

  • client_type: Select from either 'engine' or 'connection'.
  • **get_client_kwargs: Additional keyword arguments to pass to either get_engine or get_connection.

Returns:

  • The authenticated SQLAlchemy engine or connection.

Examples:

Create an engine.

python
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")

Create a context managed connection.

python
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
async with sqlalchemy_connector.get_client(client_type="connection") as conn:
    ...

get_connection <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L834" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> AsyncConnection

Returns a connection that can be used to query from databases.

Args:

  • begin: Whether to begin a transaction on the connection; if True, if any operations fail, the entire transaction will be rolled back.
  • **connect_kwargs: Additional keyword arguments to pass to either engine.begin or engine.connect`.

Returns:

  • The SQLAlchemy AsyncConnection.

Examples:

Create an asynchronous connection as a context-managed transaction.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def main():
    sqlalchemy_connector = await AsyncSqlAlchemyConnector.load("BLOCK_NAME")
    async with sqlalchemy_connector.get_connection(begin=False) as connection:
        await connection.execute("SELECT * FROM table LIMIT 1;")

asyncio.run(main())

get_engine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L784" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> AsyncEngine

Returns an authenticated engine that can be used to query from databases.

If an existing engine exists, return that one.

Returns:

  • The authenticated SQLAlchemy AsyncEngine.

Examples:

Create an asynchronous engine to PostgreSQL using URL params.

python
from prefect import flow
from prefect_sqlalchemy import (
    AsyncSqlAlchemyConnector, ConnectionComponents, AsyncDriver
)

@flow
async def sqlalchemy_credentials_flow():
    sqlalchemy_credentials = AsyncSqlAlchemyConnector(
    connection_info=ConnectionComponents(
            driver=AsyncDriver.POSTGRESQL_ASYNCPG,
            username="prefect",
            password="prefect_password",
            database="postgres"
        )
    )
    print(sqlalchemy_credentials.get_engine())

asyncio.run(sqlalchemy_credentials_flow())

reset_connections <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L980" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reset_connections(self) -> None

Tries to close all opened connections and their results.

Examples:

Resets connections so fetch_* methods return new results.

python
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        results = await database.fetch_one("SELECT * FROM customers")
        await database.reset_connections()
        results = await database.fetch_one("SELECT * FROM customers")

asyncio.run(example_run())