docs/v3/api-ref/python/prefect-server-database-query_components.mdx
prefect.server.database.query_componentsFlowRunGraphV2Node <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L41" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>BaseQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Abstract base class used to inject dialect-specific SQL operations into Prefect.
Methods:
build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs
cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary
clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.
flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).
get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]
get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters.
This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling
result.all() will return both; calling result.scalars().unique().all()
will return only the flow run because it grabs the first result.
insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement
json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array
make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]
read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key.
Configuration values should not be changed at run time, so retrieved values are cached in memory.
The main use of configurations is encrypting blocks, this speeds up nested block document queries.
set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.
uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings
AsyncPostgresQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L542" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L557" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs
cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L554" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary
clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.
flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).
get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]
get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters.
This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling
result.all() will return both; calling result.scalars().unique().all()
will return only the flow run because it grabs the first result.
insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L545" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>insert(self, obj: type[orm_models.Base]) -> postgresql.Insert
insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement
json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L562" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array
make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L567" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]
make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]
read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key.
Configuration values should not be changed at run time, so retrieved values are cached in memory.
The main use of configurations is encrypting blocks, this speeds up nested block document queries.
set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L589" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs
set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.
uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L551" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>uses_json_strings(self) -> bool
uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings
UUIDList <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L778" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Map a JSON list of strings back to a list of UUIDs at the result loading stage
Methods:
process_result_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L784" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>process_result_value(self, value: Optional[list[Union[str, UUID]]], dialect: sa.Dialect) -> Optional[list[UUID]]
AioSqliteQueryComponents <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L792" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
build_json_object <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L98" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs
cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L804" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
cast_to_json <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L94" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary
clear_configuration_value_cache_for_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.
flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L405" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).
get_scheduled_flow_runs_from_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]
get_scheduled_flow_runs_from_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters.
This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling
result.all() will return both; calling result.scalars().unique().all()
will return only the flow run because it grabs the first result.
insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L795" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>insert(self, obj: type[orm_models.Base]) -> sqlite.Insert
insert <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L81" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement
json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L812" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
json_arr_agg <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array
make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L817" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]
make_timestamp_intervals <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]
read_configuration_value <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key.
Configuration values should not be changed at run time, so retrieved values are cached in memory.
The main use of configurations is encrypting blocks, this speeds up nested block document queries.
set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L851" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs
set_state_id_on_inserted_flow_runs_statement <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
unique_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L72" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.
uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L801" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>uses_json_strings(self) -> bool
uses_json_strings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/database/query_components.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings