docs/database/queries.md
Queries are used to retrieve data from a database.
A query is a request for information from a database table or combination of tables. A query can be used to retrieve data from a single table or multiple tables. A query can also be used to insert, update, or delete data from a table.
To execute a query you must first create a rx.session. You can use the session
to query the database using SQLModel or SQLAlchemy syntax.
The rx.session statement will automatically close the session when the code
block is finished. If session.commit() is not called, the changes will be
rolled back and not persisted to the database. The code can also explicitly
rollback without closing the session via session.rollback().
The following example shows how to create a session and query the database.
First we create a table called User.
class User(rx.Model, table=True):
username: str
email: str
Then we create a session and query the User table.
class QueryUser(rx.State):
name: str
users: list[User]
@rx.event
def get_users(self):
with rx.session() as session:
self.users = session.exec(
User.select().where(User.username.contains(self.name))
).all()
The get_users method will query the database for all users that contain the
value of the state var name.
Similarly, the session.add() method to add a new record to the
database or persist an existing object.
class AddUser(rx.State):
username: str
email: str
@rx.event
def add_user(self):
with rx.session() as session:
session.add(User(username=self.username, email=self.email))
session.commit()
To update the user, first query the database for the object, make the desired
modifications, .add the object to the session and finally call .commit().
class ChangeEmail(rx.State):
username: str
email: str
@rx.event
def modify_user(self):
with rx.session() as session:
user = session.exec(
User.select().where((User.username == self.username))
).first()
user.email = self.email
session.add(user)
session.commit()
To delete a user, first query the database for the object, then call
.delete() on the session and finally call .commit().
class RemoveUser(rx.State):
username: str
@rx.event
def delete_user(self):
with rx.session() as session:
user = session.exec(
User.select().where(User.username == self.username)
).first()
session.delete(user)
session.commit()
The objects returned by queries are bound to the session that created them, and cannot generally be used outside that session. After adding or updating an object, not all fields are automatically updated, so accessing certain attributes may trigger additional queries to refresh the object.
To avoid this, the session.refresh() method can be used to update the object explicitly and
ensure all fields are up to date before exiting the session.
class AddUserForm(rx.State):
user: User | None = None
@rx.event
def add_user(self, form_data: dict[str, Any]):
with rx.session() as session:
self.user = User(**form_data)
session.add(self.user)
session.commit()
session.refresh(self.user)
Now the self.user object will have a correct reference to the autogenerated
primary key, id, even though this was not provided when the object was created
from the form data.
If self.user needs to be modified or used in another query in a new session,
it must be added to the session. Adding an object to a session does not
necessarily create the object, but rather associates it with a session where it
may either be created or updated accordingly.
class AddUserForm(rx.State):
...
@rx.event
def update_user(self, form_data: dict[str, Any]):
if self.user is None:
return
with rx.session() as session:
self.user.set(**form_data)
session.add(self.user)
session.commit()
session.refresh(self.user)
If an ORM object will be referenced and accessed outside of a session, you
should call .refresh() on it to avoid stale object exceptions.
Avoiding SQL is one of the main benefits of using an ORM, but sometimes it is necessary for particularly complex queries, or when using database-specific features.
SQLModel exposes the session.execute() method that can be used to execute raw
SQL strings. If parameter binding is needed, the query may be wrapped in
sqlalchemy.text,
which allows colon-prefix names to be used as placeholders.
# Never use string formatting to construct SQL queries, as this may lead to SQL injection vulnerabilities in the app.
import sqlalchemy
import reflex as rx
class State(rx.State):
@rx.event
def insert_user_raw(self, username, email):
with rx.session() as session:
session.execute(
sqlalchemy.text(
"INSERT INTO user (username, email) VALUES (:username, :email)"
),
{"username": username, "email": email},
)
session.commit()
@rx.var
def raw_user_tuples(self) -> list[list]:
with rx.session() as session:
return [list(row) for row in session.execute("SELECT * FROM user").all()]
Reflex provides an async version of the session function called rx.asession for asynchronous database operations. This is useful when you need to perform database operations in an async context, such as within async event handlers.
The rx.asession function returns an async SQLAlchemy session that must be used with an async context manager. Most operations against the asession must be awaited.
import sqlalchemy.ext.asyncio
import sqlalchemy
import reflex as rx
class AsyncUserState(rx.State):
users: list[User] = []
@rx.event(background=True)
async def get_users_async(self):
async with rx.asession() as asession:
result = await asession.execute(User.select())
async with self:
self.users = result.all()
The following example shows how to query the database asynchronously:
class AsyncQueryUser(rx.State):
name: str
users: list[User] = []
@rx.event(background=True)
async def get_users(self):
async with rx.asession() as asession:
stmt = User.select().where(User.username.contains(self.name))
result = await asession.execute(stmt)
async with self:
self.users = result.all()
To add a new record to the database asynchronously:
class AsyncAddUser(rx.State):
username: str
email: str
@rx.event(background=True)
async def add_user(self):
async with rx.asession() as asession:
asession.add(User(username=self.username, email=self.email))
await asession.commit()
To update a user asynchronously:
class AsyncChangeEmail(rx.State):
username: str
email: str
@rx.event(background=True)
async def modify_user(self):
async with rx.asession() as asession:
stmt = User.select().where(User.username == self.username)
result = await asession.execute(stmt)
user = result.first()
if user:
user.email = self.email
asession.add(user)
await asession.commit()
To delete a user asynchronously:
class AsyncRemoveUser(rx.State):
username: str
@rx.event(background=True)
async def delete_user(self):
async with rx.asession() as asession:
stmt = User.select().where(User.username == self.username)
result = await asession.execute(stmt)
user = result.first()
if user:
await asession.delete(user)
await asession.commit()
Similar to the regular session, you can refresh an object to ensure all fields are up to date:
class AsyncAddUserForm(rx.State):
user: User | None = None
@rx.event(background=True)
async def add_user(self, form_data: dict[str, str]):
async with rx.asession() as asession:
async with self:
self.user = User(**form_data)
asession.add(self.user)
await asession.commit()
await asession.refresh(self.user)
You can also execute raw SQL asynchronously:
class AsyncRawSQL(rx.State):
users: list[list] = []
@rx.event(background=True)
async def insert_user_raw(self, username, email):
async with rx.asession() as asession:
await asession.execute(
sqlalchemy.text(
"INSERT INTO user (username, email) VALUES (:username, :email)"
),
dict(username=username, email=email),
)
await asession.commit()
@rx.event(background=True)
async def get_raw_users(self):
async with rx.asession() as asession:
result = await asession.execute("SELECT * FROM user")
async with self:
self.users = [list(row) for row in result.all()]
# Important Notes for Async Database Operations
- Always use the `@rx.event(background=True)` decorator for async event handlers
- Most operations against the `asession` must be awaited, including `commit()`, `execute()`, `refresh()`, and `delete()`
- The `add()` method does not need to be awaited
- Result objects from queries have methods like `all()` and `first()` that are synchronous and return data directly
- Use `async with self:` when updating state variables in background tasks