Back to Prefect

query_components

docs/v3/api-ref/python/prefect-server-database-query_components.mdx

3.6.30.dev324.0 KB
Original Source

prefect.server.database.query_components

Classes

FlowRunGraphV2Node <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L41" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

BaseQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Abstract base class used to inject dialect-specific SQL operations into Prefect.

Methods:

build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

builds a JSON object from sequential key-value pairs

cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

casts to JSON object if necessary

clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_configuration_value_cache_for_key(self, key: str) -> None

Removes a configuration key from the cache.

flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph

Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]

Returns all scheduled runs in work queues, subject to provided parameters.

This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]

dialect-specific insert statement

json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

aggregates a JSON array

make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]

Read a configuration value by key.

Configuration values should not be changed at run time, so retrieved values are cached in memory.

The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
unique_key(self) -> tuple[Hashable, ...]

Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
uses_json_strings(self) -> bool

specifies whether the configured dialect returns JSON as strings

AsyncPostgresQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L542" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L557" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

builds a JSON object from sequential key-value pairs

cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L554" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

casts to JSON object if necessary

clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_configuration_value_cache_for_key(self, key: str) -> None

Removes a configuration key from the cache.

flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph

Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]

Returns all scheduled runs in work queues, subject to provided parameters.

This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L545" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
insert(self, obj: type[orm_models.Base]) -> postgresql.Insert

insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]

dialect-specific insert statement

json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L562" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

aggregates a JSON array

make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L567" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]

Read a configuration value by key.

Configuration values should not be changed at run time, so retrieved values are cached in memory.

The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L589" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
unique_key(self) -> tuple[Hashable, ...]

Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L551" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
uses_json_strings(self) -> bool

uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
uses_json_strings(self) -> bool

specifies whether the configured dialect returns JSON as strings

UUIDList <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L778" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Map a JSON list of strings back to a list of UUIDs at the result loading stage

Methods:

process_result_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L784" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
process_result_value(self, value: Optional[list[Union[str, UUID]]], dialect: sa.Dialect) -> Optional[list[UUID]]

AioSqliteQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L792" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

builds a JSON object from sequential key-value pairs

cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L804" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

casts to JSON object if necessary

clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_configuration_value_cache_for_key(self, key: str) -> None

Removes a configuration key from the cache.

flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph

Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]

Returns all scheduled runs in work queues, subject to provided parameters.

This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L795" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
insert(self, obj: type[orm_models.Base]) -> sqlite.Insert

insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]

dialect-specific insert statement

json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L812" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

aggregates a JSON array

make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L817" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]

Read a configuration value by key.

Configuration values should not be changed at run time, so retrieved values are cached in memory.

The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L851" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
unique_key(self) -> tuple[Hashable, ...]

Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L801" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
uses_json_strings(self) -> bool

uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
uses_json_strings(self) -> bool

specifies whether the configured dialect returns JSON as strings