.agents/skills/backend-dev-guidelines/references/database-patterns.md
Complete guide to database access patterns in Langfuse using PostgreSQL (Prisma ORM) and ClickHouse (direct client).
Langfuse uses a dual database architecture:
| Database | Technology | Purpose | Access Pattern |
|---|---|---|---|
| PostgreSQL | Prisma ORM | Transactional data, relational data, CRUD operations | Type-safe ORM with migrations |
| ClickHouse | Direct SQL client | Analytics data, high-volume traces/observations, aggregations | Raw SQL queries with streaming support |
| Redis | ioredis | Queues (BullMQ), caching, rate limiting | Direct client access |
Key Principle: Use PostgreSQL for transactional data and relationships. Use ClickHouse for high-volume analytics and time-series data.
⚠️ Important: All queries must filter by project_id (or projectId) to ensure proper data isolation between tenants. This is essential for the multi-tenant architecture.
import { prisma } from "@langfuse/shared/src/db";
// Direct access to Prisma client
const user = await prisma.user.findUnique({ where: { id } });
Important: Always import from @langfuse/shared/src/db, not @prisma/client directly.
⚠️ ALWAYS include projectId in WHERE clauses for project-scoped data:
// Create
const project = await prisma.project.create({
data: {
name: "My Project",
orgId: organizationId,
},
});
// ✅ GOOD: Read with projectId filter
const trace = await prisma.trace.findUnique({
where: { id: traceId, projectId }, // ← Always include projectId for tenant isolation
include: {
scores: true,
project: { select: { id: true, name: true } },
},
});
// ❌ BAD: Missing projectId filter
// const trace = await prisma.trace.findUnique({
// where: { id: traceId }, // ← Missing projectId!
// });
// Update
await prisma.user.update({
where: { id: userId },
data: { lastLogin: new Date() },
});
// ✅ GOOD: Delete with projectId
await prisma.apiKey.delete({
where: { id: apiKeyId, projectId }, // ← Always include projectId
});
// ✅ GOOD: Count with projectId
const traceCount = await prisma.trace.count({
where: { projectId, userId }, // ← Always include projectId
});
Use Prisma interactive transactions for operations that must be atomic:
const result = await prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: userData });
const project = await tx.project.create({
data: {
name: "Default Project",
orgId: user.id,
},
});
await tx.projectMembership.create({
data: {
userId: user.id,
projectId: project.id,
role: "OWNER",
},
});
return { user, project };
});
Transaction options:
await prisma.$transaction(
async (tx) => {
// Transaction logic
},
{
maxWait: 5000, // Max time to wait for transaction to start (ms)
timeout: 10000, // Max time transaction can run (ms)
},
);
Use select to limit fields:
// ❌ Fetches all fields (including large JSON columns)
const traces = await prisma.trace.findMany({ where: { projectId } });
// ✅ Only fetch needed fields
const traces = await prisma.trace.findMany({
where: { projectId },
select: {
id: true,
name: true,
timestamp: true,
userId: true,
},
});
Prevent N+1 queries with include:
// ❌ N+1 Query Problem
const projects = await prisma.project.findMany();
for (const project of projects) {
// N additional queries
const memberCount = await prisma.projectMembership.count({
where: { projectId: project.id },
});
}
// ✅ Use include or aggregation
const projects = await prisma.project.findMany({
include: {
members: { select: { userId: true, role: true } },
},
});
Pagination:
const PAGE_SIZE = 50;
const traces = await prisma.trace.findMany({
where: { projectId },
orderBy: { timestamp: "desc" },
take: PAGE_SIZE,
skip: page * PAGE_SIZE,
});
import { queryClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";
import { clickhouseClient } from "@langfuse/shared/src/server/clickhouse/client";
ClickHouse uses a singleton client manager that reuses connections:
import { clickhouseClient } from "@langfuse/shared/src/server/clickhouse/client";
// Get client (automatically reuses existing connection)
const client = clickhouseClient();
// For read-only queries (uses read replica if configured)
const client = clickhouseClient(undefined, "ReadOnly");
ClickHouse queries use raw SQL with parameterized queries. Parameters use {paramName: Type} syntax:
⚠️ Important: All ClickHouse queries must include project_id filter to ensure proper tenant isolation.
Simple query:
import { queryClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";
// ✅ GOOD: Always filter by project_id
const rows = await queryClickhouse<{ id: string; name: string }>({
query: `
SELECT id, name, timestamp
FROM traces
WHERE project_id = {projectId: String} -- ← REQUIRED: Always filter by project_id
AND timestamp >= {startTime: DateTime64(3)}
ORDER BY timestamp DESC
LIMIT {limit: UInt32}
`,
params: {
projectId, // ← Required for tenant isolation
startTime: convertDateToClickhouseDateTime(startDate),
limit: 100,
},
tags: { feature: "tracing", type: "trace" },
});
// ❌ BAD: Missing project_id filter
// const rows = await queryClickhouse({
// query: `SELECT * FROM traces WHERE timestamp >= {startTime: DateTime64(3)}`,
// params: { startTime },
// });
Streaming query (for large result sets):
import { queryClickhouseStream } from "@langfuse/shared/src/server/repositories/clickhouse";
// Stream results to avoid loading all rows in memory
for await (const row of queryClickhouseStream<ObservationRecordReadType>({
query: `
SELECT *
FROM observations
WHERE project_id = {projectId: String}
AND start_time >= {startTime: DateTime64(3)}
`,
params: { projectId, startTime },
})) {
// Process row by row
await processObservation(row);
}
Upsert (insert) operation:
import { upsertClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";
await upsertClickhouse({
table: "traces",
records: [
{
id: traceId,
project_id: projectId,
timestamp: new Date(),
name: "API Call",
user_id: userId,
// ... other fields
},
],
eventBodyMapper: (record) => ({
// Transform record for event log
id: record.id,
name: record.name,
// ... other fields
}),
tags: { feature: "ingestion", type: "trace" },
});
DDL/Administrative commands:
import { commandClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";
// Create table, alter schema, etc.
await commandClickhouse({
query: `
ALTER TABLE traces
ADD COLUMN IF NOT EXISTS new_field String
`,
tags: { feature: "migration" },
});
| JavaScript Type | ClickHouse Param Type |
|---|---|
string | String |
number | UInt32, Int64, Float64 |
Date | DateTime64(3) (use convertDateToClickhouseDateTime()) |
boolean | UInt8 (0 or 1) |
string[] | Array(String) |
Date handling:
import { convertDateToClickhouseDateTime } from "@langfuse/shared/src/server/clickhouse/client";
const params = {
startTime: convertDateToClickhouseDateTime(new Date()),
};
1. Always filter by project_id for tenant isolation:
// ✅ CORRECT: project_id filter is required
const query = `
SELECT *
FROM traces
WHERE project_id = {projectId: String} -- ← Required for tenant isolation
AND timestamp >= {startTime: DateTime64(3)}
`;
// ❌ WRONG: Missing project_id filter
// const query = `
// SELECT * FROM traces WHERE timestamp >= {startTime: DateTime64(3)}
// `;
Why this is important:
project_id filter ensures queries only access data from the intended tenantproject_id2. Use LIMIT BY for deduplication:
// Get latest version of each trace
const query = `
SELECT *
FROM traces
WHERE project_id = {projectId: String} -- ← Always include project_id
ORDER BY event_ts DESC
LIMIT 1 BY id, project_id
`;
3. Use time-based filtering for performance:
// Combine project_id filter with timestamp for optimal performance
const query = `
SELECT *
FROM observations
WHERE project_id = {projectId: String} -- ← Required for tenant isolation
AND start_time >= {startTime: DateTime64(3)} -- ← Improves performance
AND start_time < {endTime: DateTime64(3)}
`;
4. Use CTEs for complex queries (still require project_id):
const query = `
WITH observations_agg AS (
SELECT
trace_id,
count() as observation_count,
sum(total_cost) as total_cost
FROM observations
WHERE project_id = {projectId: String} -- ← Filter in CTE
GROUP BY trace_id
)
SELECT
t.id,
t.name,
o.observation_count,
o.total_cost
FROM traces t
LEFT JOIN observations_agg o ON t.id = o.trace_id
WHERE t.project_id = {projectId: String} -- ← Filter in main query
`;
Note: When using CTEs or subqueries, ensure project_id filter is applied at each level.
Error handling with retries:
ClickHouse queries automatically retry on network errors (socket hang up). Custom error handling for resource limits:
import {
queryClickhouse,
ClickHouseResourceError,
} from "@langfuse/shared/src/server/repositories/clickhouse";
try {
const rows = await queryClickhouse({ query, params });
} catch (error) {
if (error instanceof ClickHouseResourceError) {
// Memory limit, timeout, or overcommit error
throw new Error(ClickHouseResourceError.ERROR_ADVICE_MESSAGE);
}
throw error;
}
Langfuse uses repositories in packages/shared/src/server/repositories/ for complex data access patterns.
✅ Use repositories when:
❌ Use direct Prisma/ClickHouse for:
Trace repository (ClickHouse):
// packages/shared/src/server/repositories/traces.ts
export const getTracesByIds = async (
projectId: string,
traceIds: string[],
): Promise<TraceRecordReadType[]> => {
const rows = await queryClickhouse<TraceRecordReadType>({
query: `
SELECT *
FROM traces
WHERE project_id = {projectId: String}
AND id IN ({traceIds: Array(String)})
ORDER BY event_ts DESC
LIMIT 1 BY id, project_id
`,
params: { projectId, traceIds },
tags: { feature: "tracing", type: "trace" },
});
return rows.map(convertClickhouseToDomain);
};
Score repository (PostgreSQL + ClickHouse):
// Repositories can query both databases
export const getScoresByTraceId = async (
projectId: string,
traceId: string,
) => {
// Use ClickHouse for analytics
const clickhouseScores = await queryClickhouse<ScoreRecordReadType>({
query: `
SELECT *
FROM scores
WHERE project_id = {projectId: String}
AND trace_id = {traceId: String}
`,
params: { projectId, traceId },
});
// Use Prisma for config data
const scoreConfigs = await prisma.scoreConfig.findMany({
where: { projectId },
});
return enrichScoresWithConfigs(clickhouseScores, scoreConfigs);
};
| Use Case | Database | Reasoning |
|---|---|---|
| User accounts, projects, API keys | PostgreSQL | Transactional data with strong consistency |
| Prompt management, dataset definitions | PostgreSQL | Configuration data with relations |
| Project settings, RBAC permissions | PostgreSQL | Small, frequently updated data |
| Traces, observations, events | ClickHouse | High-volume time-series data |
| Score aggregations, analytics queries | ClickHouse | Fast aggregations over millions of rows |
| Usage metrics, cost calculations | ClickHouse | Analytical queries with GROUP BY |
| Exports, large dataset queries | ClickHouse | Streaming support for large result sets |
Decision flow:
Project-scoped tables (MUST filter by project_id):
traces - All trace queries require project_idobservations - All observation queries require project_idscores - All score queries require project_idevents - All event queries require project_iddataset_run_items_rmt - All dataset run queries require project_idGlobal tables (no project_id filter needed):
users - User management (use id for filtering)organizations - Organization data (use id for filtering)Example of correct filtering:
// ✅ CORRECT: Project-scoped query
const traces = await queryClickhouse({
query: `
SELECT * FROM traces
WHERE project_id = {projectId: String}
AND timestamp >= {startTime: DateTime64(3)}
`,
params: { projectId, startTime },
});
// ✅ CORRECT: Global table query (no project_id needed)
const user = await prisma.user.findUnique({
where: { id: userId },
});
// ❌ WRONG: Project-scoped query without project_id filter
// const traces = await queryClickhouse({
// query: `SELECT * FROM traces WHERE timestamp >= {startTime: DateTime64(3)}`,
// });
import { Prisma } from "@prisma/client";
import { prisma } from "@langfuse/shared/src/db";
try {
await prisma.user.create({ data: userData });
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
const target = error.meta?.target as string[];
throw new ConflictError(`${target?.join(", ")} already exists`);
}
// Foreign key constraint
if (error.code === "P2003") {
throw new ValidationError("Invalid reference");
}
// Record not found
if (error.code === "P2025") {
throw new NotFoundError("Record not found");
}
// Record required to connect not found
if (error.code === "P2018") {
throw new ValidationError("Related record not found");
}
}
// Unknown error
logger.error("Prisma error", { error });
throw error;
}
Common Prisma error codes:
| Code | Meaning | Typical Cause |
|---|---|---|
P2002 | Unique constraint violation | Duplicate email, API key, etc. |
P2003 | Foreign key constraint | Referenced record doesn't exist |
P2025 | Record not found | Update/delete of non-existent record |
P2018 | Required relation not found | Connect to non-existent related record |
import {
queryClickhouse,
ClickHouseResourceError,
} from "@langfuse/shared/src/server/repositories/clickhouse";
try {
const rows = await queryClickhouse({ query, params });
} catch (error) {
// ClickHouse resource errors (memory limit, timeout, overcommit)
if (error instanceof ClickHouseResourceError) {
logger.warn("ClickHouse resource error", {
errorType: error.errorType, // "MEMORY_LIMIT" | "OVERCOMMIT" | "TIMEOUT"
message: error.message,
});
// User-friendly error message
throw new BadRequestError(ClickHouseResourceError.ERROR_ADVICE_MESSAGE);
}
// Network/connection errors are automatically retried
logger.error("ClickHouse error", { error });
throw error;
}
ClickHouse error types:
| Error Type | Discriminator | Meaning | Solution |
|---|---|---|---|
MEMORY_LIMIT | "memory limit exceeded" | Query used too much memory | Use more specific filters or shorter time range |
OVERCOMMIT | "OvercommitTracker" | Memory overcommit limit hit | Reduce query complexity or result set size |
TIMEOUT | "Timeout", "timed out" | Query took too long | Add filters, reduce time range, or optimize query |
ClickHouse retries:
ClickHouse queries automatically retry network errors (socket hang up) with exponential backoff. Configure retry behavior:
// In packages/shared/src/env.ts
LANGFUSE_CLICKHOUSE_QUERY_MAX_ATTEMPTS: z.coerce.number().positive().default(3)
Related Files: