stores/pg/README.md
PostgreSQL implementation for Mastra, providing both vector similarity search (using pgvector) and general storage capabilities with connection pooling and transaction support.
npm install @mastra/pg
PgVector supports multiple connection methods:
1. Connection String (Recommended)
import { PgVector } from '@mastra/pg';
const vectorStore = new PgVector({
connectionString: 'postgresql://user:pass@localhost:5432/db',
});
2. Host/Port/Database Configuration
const vectorStore = new PgVector({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password',
});
Note: PgVector also supports advanced configurations like Google Cloud SQL Connector via
pg.ClientConfig.
const vectorStore = new PgVector({
connectionString: 'postgresql://user:pass@localhost:5432/db',
schemaName: 'custom_schema', // Use custom schema (default: public)
max: 30, // Max pool connections (default: 20)
idleTimeoutMillis: 60000, // Idle timeout (default: 30000)
pgPoolOptions: {
// Additional pg pool options
connectionTimeoutMillis: 5000,
allowExitOnIdle: true,
},
});
// Create a new table with vector support
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
// Optional: Configure index type and parameters
indexConfig: {
type: 'hnsw', // 'ivfflat' (default), 'hnsw', or 'flat'
hnsw: {
m: 16, // Number of connections per layer (default: 8)
efConstruction: 64 // Size of dynamic list (default: 32)
}
}
});
// Add vectors
const ids = await vectorStore.upsert({
indexName: 'my_vectors',
vectors: [[0.1, 0.2, ...], [0.3, 0.4, ...]],
metadata: [{ text: 'doc1' }, { text: 'doc2' }],
});
// Query vectors
const results = await vectorStore.query({
indexName: 'my_vectors',
queryVector: [0.1, 0.2, ...],
topK: 10, // topK
filter: { text: 'doc1' }, // filter
includeVector: false, // includeVector
minScore: 0.5, // minScore
});
// Clean up
await vectorStore.disconnect();
import { PostgresStore } from '@mastra/pg';
const store = new PostgresStore({
host: 'localhost',
port: 5432,
database: 'mastra',
user: 'postgres',
password: 'postgres',
});
// Create a thread
await store.saveThread({
thread: {
id: 'thread-123',
resourceId: 'resource-456',
title: 'My Thread',
metadata: { key: 'value' },
createdAt: new Date(),
},
});
// Add messages to thread
await store.saveMessages({
messages: [
{
id: 'msg-789',
threadId: 'thread-123',
role: 'user',
content: { content: 'Hello' },
resourceId: 'resource-456',
createdAt: new Date(),
},
],
});
// Query threads and messages
const savedThread = await store.getThreadById({ threadId: 'thread-123' });
const messages = await store.listMessages({ threadId: 'thread-123' });
Both PgVector and PostgresStore support multiple connection methods:
Connection String
{
connectionString: 'postgresql://user:pass@localhost:5432/db';
}
Host/Port/Database
{
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password'
}
Advanced: Also supports
pg.ClientConfigfor use cases like Google Cloud SQL Connector with IAM authentication.
schemaName: Custom PostgreSQL schema (default: public)ssl: Enable SSL or provide custom SSL options (true | false | ConnectionOptions)max: Maximum pool connections (default: 20)idleTimeoutMillis: Idle connection timeout (default: 30000)pgPoolOptions: Additional pg pool options (PgVector only)The following filter operators are supported for metadata queries:
$eq, $ne, $gt, $gte, $lt, $lte$and, $or$in, $nin$regex, $likeExample filter:
{
$and: [{ age: { $gt: 25 } }, { tags: { $in: ['tag1', 'tag2'] } }];
}
pgvector supports three index types, each with different performance characteristics:
IVFFlat groups vectors into clusters for efficient searching:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
indexConfig: {
type: 'ivfflat',
ivf: {
lists: 1000, // Number of clusters (default: auto-calculated as sqrt(rows) * 2)
},
},
});
HNSW builds a graph structure for extremely fast searches:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'dotproduct', // Recommended for normalized embeddings (OpenAI, etc.)
indexConfig: {
type: 'hnsw',
hnsw: {
m: 16, // Connections per layer (default: 8, range: 2-100)
efConstruction: 64, // Dynamic list size (default: 32, range: 4-1000)
},
},
});
Tuning HNSW:
m: Better accuracy, more memory (16-32 for high accuracy)efConstruction: Better index quality, slower builds (64-200 for quality)Uses sequential scan for 100% accuracy:
await vectorStore.createIndex({
indexName: 'my_vectors',
dimension: 1536,
metric: 'cosine',
indexConfig: {
type: 'flat',
},
});
Choose the appropriate metric for your embeddings:
cosine (default): Angular similarity, good for text embeddingseuclidean: L2 distance, for unnormalized embeddingsdotproduct: Dot product, optimal for normalized embeddings (OpenAI, Cohere)The system automatically detects configuration changes and only rebuilds indexes when necessary, preventing the performance issues from unnecessary recreations.
Important behaviors:
indexConfig is provided, existing indexes are preserved as-isindexConfig is provided, indexes are only rebuilt if the configuration differscreateIndex({indexName, dimension, metric?, indexConfig?, buildIndex?}): Create a new table with vector supportbuildIndex({indexName, metric?, indexConfig?}): Build or rebuild vector indexupsert({indexName, vectors, metadata?, ids?}): Add or update vectorsquery({indexName, queryVector, topK?, filter?, includeVector?, minScore?}): Search for similar vectorsupdateVector({ indexName, id?, filter?, update }): Update a single vector by ID or metadata filterdeleteVector({ indexName, id }): Delete a single vector by IDdeleteVectors({ indexName, ids?, filter? }): Delete multiple vectors by IDs or metadata filterlistIndexes(): List all vector-enabled tablesdescribeIndex(indexName): Get table statistics and index configurationdeleteIndex(indexName): Delete a tabletruncateIndex(indexName): Remove all data from a tabledisconnect(): Close all database connectionssaveThread({ thread }): Create or update a threadgetThreadById({ threadId }): Get a thread by IDupdateThread({ id, title, metadata }): Update thread title and/or metadatadeleteThread({ threadId }): Delete a thread and its messageslistThreadsByResourceId({ resourceId, offset, limit, orderBy? }): List paginated threads for a resourcesaveMessages({ messages }): Save multiple messages in a transactionlistMessages({ threadId, resourceId?, perPage?, page?, orderBy?, filter? }): Get messages for a thread with paginationlistMessagesById({ messageIds }): Get specific messages by their IDsupdateMessages({ messages }): Update existing messagesdeleteMessages(messageIds): Delete specific messagesgetResourceById({ resourceId }): Get a resource by IDsaveResource({ resource }): Create or save a resourceupdateResource({ resourceId, workingMemory }): Update resource working memorypersistWorkflowSnapshot({ workflowName, runId, snapshot }): Save workflow stateloadWorkflowSnapshot({ workflowName, runId }): Load workflow statelistWorkflowRuns({ workflowName, pagination }): List workflow runs with paginationgetWorkflowRunById({ workflowName, runId }): Get a specific workflow runupdateWorkflowState({ workflowName, runId, state }): Update workflow stateupdateWorkflowResults({ workflowName, runId, results }): Update workflow resultscreateSpan(span): Create a single AI spanbatchCreateSpans({ records }): Create multiple AI spansupdateSpan({ traceId, spanId, updates }): Update an AI spanbatchUpdateSpans({ updates }): Update multiple AI spansgetTrace(traceId): Get an trace by IDgetTracesPaginated({ ...filters, pagination }): Get paginated traces with filteringbatchDeleteTraces({ traceIds }): Delete multiple tracesgetScoreById({ id }): Get a score by IDsaveScore(score): Save an evaluation scorelistScoresByScorerId({ scorerId, pagination }): List scores by scorer with paginationlistScoresByRunId({ runId, pagination }): List scores by run with paginationlistScoresByEntityId({ entityId, entityType, pagination }): List scores by entity with paginationlistScoresBySpan({ traceId, spanId, pagination }): List scores by span with paginationThe PostgreSQL store provides comprehensive index management capabilities to optimize query performance.
PostgreSQL storage automatically creates composite indexes during initialization for common query patterns:
mastra_threads_resourceid_createdat_idx: (resourceId, createdAt DESC)mastra_messages_thread_id_createdat_idx: (thread_id, createdAt DESC)mastra_traces_name_starttime_idx: (name, startTime DESC)mastra_evals_agent_name_created_at_idx: (agent_name, created_at DESC)These indexes significantly improve performance for filtered queries with sorting.
Create additional indexes to optimize specific query patterns:
// Basic index for common queries
await store.createIndex({
name: 'idx_threads_resource',
table: 'mastra_threads',
columns: ['resourceId'],
});
// Composite index with sort order for filtering + sorting
await store.createIndex({
name: 'idx_messages_composite',
table: 'mastra_messages',
columns: ['thread_id', 'createdAt DESC'],
});
// GIN index for JSONB columns (fast JSON queries)
await store.createIndex({
name: 'idx_traces_attributes',
table: 'mastra_traces',
columns: ['attributes'],
method: 'gin',
});
For more advanced use cases, you can also use:
unique: true for unique constraintswhere: 'condition' for partial indexesmethod: 'brin' for time-series datastorage: { fillfactor: 90 } for update-heavy tablesconcurrent: true for non-blocking creation (default)// List all indexes
const allIndexes = await store.listIndexes();
// List indexes for specific table
const threadIndexes = await store.listIndexes('mastra_threads');
// Get detailed statistics for an index
const stats = await store.describeIndex('idx_threads_resource');
console.log(stats);
// {
// name: 'idx_threads_resource',
// table: 'mastra_threads',
// columns: ['resourceId', 'createdAt'],
// unique: false,
// size: '128 KB',
// definition: 'CREATE INDEX idx_threads_resource...',
// method: 'btree',
// scans: 1542, // Number of index scans
// tuples_read: 45230, // Tuples read via index
// tuples_fetched: 12050 // Tuples fetched via index
// }
// Drop an index
await store.dropIndex('idx_threads_status');
| Index Type | Best For | Storage | Speed |
|---|---|---|---|
| btree (default) | Range queries, sorting, general purpose | Moderate | Fast |
| hash | Equality comparisons only | Small | Very fast for = |
| gin | JSONB, arrays, full-text search | Large | Fast for contains |
| gist | Geometric data, full-text search | Moderate | Fast for nearest-neighbor |
| spgist | Non-balanced data, text patterns | Small | Fast for specific patterns |
| brin | Large tables with natural ordering | Very small | Fast for ranges |
name (required): Index nametable (required): Table namecolumns (required): Array of column names (can include DESC/ASC)unique: Create unique index (default: false)concurrent: Non-blocking index creation (default: true)where: Partial index conditionmethod: Index type ('btree' | 'hash' | 'gin' | 'gist' | 'spgist' | 'brin')opclass: Operator class for GIN/GIST indexesstorage: Storage parameters (e.g., { fillfactor: 90 })tablespace: Tablespace name for index placement// Check index usage statistics
const stats = await store.describeIndex('idx_threads_resource');
// Identify unused indexes
if (stats.scans === 0) {
console.log(`Index ${stats.name} is unused - consider removing`);
await store.dropIndex(stats.name);
}
// Monitor index efficiency
const efficiency = stats.tuples_fetched / stats.tuples_read;
if (efficiency < 0.5) {
console.log(`Index ${stats.name} has low efficiency: ${efficiency}`);
}