.agents/skills/backend-code-review/references/sqlalchemy-rule.md
lfx.services.deps.session_scope, lfx.services.deps.session_scope_readonlylangflow.services.deps.session_scope, langflow.services.deps.session_scope_readonlylfx.services.deps.injectable_session_scope, lfx.services.deps.injectable_session_scope_readonlylangflow.services.database.service.DatabaseServicesrc/backend/base/langflow/services/database/models/*/crud.pyfrom langflow.services.deps import session_scope in Langflow code for consistency. The Langflow wrappers are thin @asynccontextmanager functions that delegate to lfx.services.deps. Only import directly from lfx when working inside the lfx package itself.db-schema-rule.md).session_scope() context manager with explicit transaction awarenesssession_scope() context manager provides auto-commit on successful exit and auto-rollback on exception. For read-only paths, use session_scope_readonly() which skips commit overhead. In route handlers, use the injectable variants via Depends(injectable_session_scope). Missing commits can silently drop intended updates, while ad-hoc or long-lived transactions increase contention and deadlock risk.async with session_scope() as session: which auto-commits on success.async with session_scope_readonly() as session: to avoid unnecessary commit calls.session: AsyncSession = Depends(injectable_session_scope) for writes or Depends(injectable_session_scope_readonly) for reads.# Creating a raw session without the context manager
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.deps import get_service
db_service = get_service(ServiceType.DATABASE_SERVICE)
session = AsyncSession(db_service.engine)
flow = (await session.execute(select(Flow).where(Flow.id == flow_id))).scalar_one()
flow.name = "Updated"
# Missing commit, session never closed properly
from langflow.services.deps import session_scope
async with session_scope() as session:
flow = (await session.execute(select(Flow).where(Flow.id == flow_id))).scalar_one()
flow.name = "Updated"
# session_scope auto-commits on successful exit
# For read-only operations:
from langflow.services.deps import session_scope_readonly
async with session_scope_readonly() as session:
flows = (await session.execute(select(Flow).where(Flow.user_id == user_id))).scalars().all()
user_id scoping on user-owned queriesuser_id to prevent cross-user data leakage or corruption. Langflow uses user_id (not tenant_id) for user-scoped data isolation. Every query on user-owned entities (flows, variables, folders, messages, API keys) must include a user_id filter.user_id predicate to all user-owned entity queries and propagate user context through service interfaces. The current_user.id is available from the get_current_active_user dependency in route handlers.# Missing user_id scope: any authenticated user can read any flow
stmt = select(Flow).where(Flow.id == flow_id)
flow = (await session.execute(stmt)).scalar_one_or_none()
stmt = select(Flow).where(
Flow.id == flow_id,
Flow.user_id == current_user.id,
)
flow = (await session.execute(stmt)).scalar_one_or_none()
text() should be exceptional. ORM/Core expressions are easier to evolve, safer to compose, dialect-portable (SQLite + PostgreSQL), and more consistent with the codebase. Langflow uses sqlmodel.select() for queries which provides both type safety and dialect portability.select/update/delete expressions; keep raw SQL only when required by clear technical constraints (e.g., database-specific administrative queries).from sqlmodel import text
result = await session.execute(
text("SELECT * FROM flow WHERE id = :id AND user_id = :user_id"),
{"id": str(flow_id), "user_id": str(user_id)},
)
row = result.first()
from sqlmodel import select
stmt = select(Flow).where(
Flow.id == flow_id,
Flow.user_id == user_id,
)
flow = (await session.execute(stmt)).scalar_one_or_none()
time.sleep(), CPU-bound computation, synchronous HTTP requests) block the entire event loop and starve other coroutines. This also extends transaction duration unnecessarily, increasing lock contention.asyncio.to_thread() or anyio.to_thread.run_sync() for CPU-bound or synchronous I/O work.async with session_scope() as session:
flow = (await session.execute(select(Flow).where(Flow.id == flow_id))).scalar_one()
# Blocking HTTP call inside transaction scope
import requests
response = requests.post("https://external-api.com/validate", json=flow.data)
flow.validated = response.ok
async with session_scope_readonly() as session:
flow = (await session.execute(select(Flow).where(Flow.id == flow_id))).scalar_one()
flow_data = flow.data
# External call outside transaction scope
import httpx
async with httpx.AsyncClient() as client:
response = await client.post("https://external-api.com/validate", json=flow_data)
async with session_scope() as session:
flow = (await session.execute(select(Flow).where(Flow.id == flow_id))).scalar_one()
flow.validated = response.is_success
updated_at guard in WHERE and treat rowcount == 0 as a conflict.user_id and verify affected row counts for conditional writes.# No user scope, no conflict detection on a contested write path
async with session_scope() as session:
await session.execute(
update(Flow).where(Flow.id == flow_id).values(name="Updated")
)
# Optimistic lock (low contention, retry on conflict)
from sqlmodel import update
async with session_scope() as session:
result = await session.execute(
update(Flow)
.where(
Flow.id == flow_id,
Flow.user_id == user_id,
Flow.updated_at == expected_updated_at,
)
.values(name="Updated", updated_at=datetime.now(timezone.utc))
)
if result.rowcount == 0:
raise FlowStateConflictError("Flow was modified concurrently, retry")
# Pessimistic lock with SELECT ... FOR UPDATE (high contention)
async with session_scope() as session:
flow = (await session.execute(
select(Flow)
.where(Flow.id == flow_id, Flow.user_id == user_id)
.with_for_update()
)).scalar_one()
flow.name = "Updated"
flow.updated_at = datetime.now(timezone.utc)
session_scope() calls (the context manager is not reentrant).async with session_scope() as session:
flows = (await session.execute(select(Flow).where(Flow.user_id == user_id))).scalars().all()
for flow in flows:
# Expensive computation inside transaction
analysis = await analyze_flow_complexity(flow.data)
flow.complexity_score = analysis.score
flow.updated_at = datetime.now(timezone.utc)
# Read phase
async with session_scope_readonly() as session:
flows = (await session.execute(select(Flow).where(Flow.user_id == user_id))).scalars().all()
flow_data = [(f.id, f.data) for f in flows]
# Compute phase (outside transaction)
updates = {}
for flow_id, data in flow_data:
analysis = await analyze_flow_complexity(data)
updates[flow_id] = analysis.score
# Write phase (short transaction)
async with session_scope() as session:
for flow_id, score in updates.items():
await session.execute(
update(Flow)
.where(Flow.id == flow_id, Flow.user_id == user_id)
.values(complexity_score=score, updated_at=datetime.now(timezone.utc))
)