docs/integrations/prefect-sqlalchemy/api-ref/prefect_sqlalchemy-database.mdx
prefect_sqlalchemy.databaseTasks for querying a database with SQLAlchemy
check_make_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L23" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>check_make_url(url: str) -> str
SqlAlchemyConnector <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L34" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Block used to manage authentication with a database using synchronous drivers.
Upon instantiating, an engine is created and maintained for the life of the object until the close method is called.
It is recommended to use this block as a context manager, which will automatically close the engine and its connections when the context is exited.
It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block's connection and cursor could be lost.
Attributes:
connection_info: SQLAlchemy URL to create the engine;
either create from components or create from a string.connect_args: The options which will be passed directly to the
DBAPI's connect() method as additional keyword arguments.fetch_size: The number of rows to fetch at a time.Methods:
block_initialization <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L122" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>block_initialization(self) -> None
Initializes the engine.
close <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L633" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>close(self) -> None
Closes sync connections and its cursors.
execute <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L527" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult
Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Unlike the fetch methods, this method will always execute the operation upon calling.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Examples:
Create a table and insert one row into it.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
database.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
execute_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L571" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult
Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Unlike the fetch methods, this method will always execute the operation upon calling.
Args:
operation: The SQL query or other operation to be executed.seq_of_parameters: The sequence of parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Examples:
Create a table and insert two rows into it.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
fetch_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L474" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch all results from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch all where name is 'Me'.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = database.fetch_all(
"SELECT * FROM customers WHERE name = :name",
parameters={"name": "Me"}
)
fetch_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L417" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch a limited number of results from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.size: The number of results to return; if None or 0, uses the value of
fetch_size configured on the block.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch two rows repeatedly.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
fetch_one <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L364" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]
Fetch a single result from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch a row repeatedly.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = True
while results:
results = database.fetch_one("SELECT * FROM customers")
print(results)
get_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[Engine, Connection]
Returns either an engine or connection that can be used to query from databases.
Args:
client_type: Select from either 'engine' or 'connection'.**get_client_kwargs: Additional keyword arguments to pass to
either get_engine or get_connection.Returns:
Examples:
Create an engine.
from prefect_sqlalchemy import SqlAlchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")
Create a context managed connection.
from prefect_sqlalchemy import SqlAlchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_client(client_type="connection") as conn:
...
get_connection <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L201" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> Connection
Returns a connection that can be used to query from databases.
Args:
begin: Whether to begin a transaction on the connection; if True, if
any operations fail, the entire transaction will be rolled back.**connect_kwargs: Additional keyword arguments to pass to either
engine.begin or engine.connect`.Returns:
Examples:
Create a synchronous connection as a context-managed transaction.
from prefect_sqlalchemy import SqlAlchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_connection(begin=False) as connection:
connection.execute("SELECT * FROM table LIMIT 1;")
get_engine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L151" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> Engine
Returns an authenticated engine that can be used to query from databases.
If an existing engine exists, return that one.
Returns:
Examples:
Create a synchronous engine to PostgreSQL using URL params.
from prefect import flow
from prefect_sqlalchemy import (
SqlAlchemyConnector, ConnectionComponents, SyncDriver
)
@flow
def sqlalchemy_credentials_flow():
sqlalchemy_credentials = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
username="prefect",
password="prefect_password",
database="postgres"
)
)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
reset_connections <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L341" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reset_connections(self) -> None
Tries to close all opened connections and their results.
Examples:
Resets connections so fetch_* methods return new results.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
results = database.fetch_one("SELECT * FROM customers")
database.reset_connections()
results = database.fetch_one("SELECT * FROM customers")
AsyncSqlAlchemyConnector <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L662" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Block used to manage authentication with a database using asynchronous drivers.
Upon instantiating, an engine is created and maintained for the life of the object until the close method is called.
It is recommended to use this block as an async context manager, which will automatically close the engine and its connections when the context is exited.
It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block's connection and cursor could be lost.
Attributes:
connection_info: SQLAlchemy URL to create the engine;
either create from components or create from a string.connect_args: The options which will be passed directly to the
DBAPI's connect() method as additional keyword arguments.fetch_size: The number of rows to fetch at a time.Methods:
block_initialization <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L758" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>block_initialization(self) -> None
Initializes the engine.
close <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1296" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>close(self) -> None
Closes async connections and its cursors.
execute <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1182" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult
Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Unlike the fetch methods, this method will always execute the operation upon calling.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Examples:
Create a table and insert one row into it.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await database.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
asyncio.run(example_run())
execute_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1230" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult
Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Unlike the fetch methods, this method will always execute the operation upon calling.
Args:
operation: The SQL query or other operation to be executed.seq_of_parameters: The sequence of parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Examples:
Create a table and insert two rows into it.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
asyncio.run(example_run())
fetch_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch all results from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch all where name is 'Me'.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = await database.fetch_all(
"SELECT * FROM customers WHERE name = :name",
parameters={"name": "Me"}
)
asyncio.run(example_run())
fetch_many <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1064" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch a limited number of results from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.size: The number of results to return; if None or 0, uses the value of
fetch_size configured on the block.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch two rows repeatedly.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = await database.fetch_many("SELECT * FROM customers", size=2)
print(results)
results = await database.fetch_many("SELECT * FROM customers", size=2)
print(results)
asyncio.run(example_run())
fetch_one <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L1007" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]
Fetch a single result from the database.
Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called.
Args:
operation: The SQL query or other operation to be executed.parameters: The parameters for the operation.**execution_options: Options to pass to Connection.execution_options.Returns:
Examples:
Create a table, insert three rows into it, and fetch a row repeatedly.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = True
while results:
results = await database.fetch_one("SELECT * FROM customers")
print(results)
asyncio.run(example_run())
get_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L871" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[AsyncEngine, AsyncConnection]
Returns either an engine or connection that can be used to query from databases.
Args:
client_type: Select from either 'engine' or 'connection'.**get_client_kwargs: Additional keyword arguments to pass to
either get_engine or get_connection.Returns:
Examples:
Create an engine.
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")
Create a context managed connection.
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
async with sqlalchemy_connector.get_client(client_type="connection") as conn:
...
get_connection <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L834" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> AsyncConnection
Returns a connection that can be used to query from databases.
Args:
begin: Whether to begin a transaction on the connection; if True, if
any operations fail, the entire transaction will be rolled back.**connect_kwargs: Additional keyword arguments to pass to either
engine.begin or engine.connect`.Returns:
Examples:
Create an asynchronous connection as a context-managed transaction.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def main():
sqlalchemy_connector = await AsyncSqlAlchemyConnector.load("BLOCK_NAME")
async with sqlalchemy_connector.get_connection(begin=False) as connection:
await connection.execute("SELECT * FROM table LIMIT 1;")
asyncio.run(main())
get_engine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L784" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> AsyncEngine
Returns an authenticated engine that can be used to query from databases.
If an existing engine exists, return that one.
Returns:
Examples:
Create an asynchronous engine to PostgreSQL using URL params.
from prefect import flow
from prefect_sqlalchemy import (
AsyncSqlAlchemyConnector, ConnectionComponents, AsyncDriver
)
@flow
async def sqlalchemy_credentials_flow():
sqlalchemy_credentials = AsyncSqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=AsyncDriver.POSTGRESQL_ASYNCPG,
username="prefect",
password="prefect_password",
database="postgres"
)
)
print(sqlalchemy_credentials.get_engine())
asyncio.run(sqlalchemy_credentials_flow())
reset_connections <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/database.py#L980" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reset_connections(self) -> None
Tries to close all opened connections and their results.
Examples:
Resets connections so fetch_* methods return new results.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
async def example_run():
async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
results = await database.fetch_one("SELECT * FROM customers")
await database.reset_connections()
results = await database.fetch_one("SELECT * FROM customers")
asyncio.run(example_run())