stores/mssql/README.md
Microsoft SQL Server implementation for Mastra, providing general storage capabilities with connection pooling and transaction support.
npm install @mastra/mssql
MSSQLStore supports multiple connection methods:
1. Connection String (Recommended)
import { MSSQLStore } from '@mastra/mssql';
const store = new MSSQLStore({
id: 'mssql-storage',
connectionString:
'Server=localhost,1433;Database=mastra;User Id=sa;Password=yourPassword;Encrypt=true;TrustServerCertificate=true',
});
2. Server/Port/Database Configuration
const store = new MSSQLStore({
id: 'mssql-storage',
server: 'localhost',
port: 1433,
database: 'mastra',
user: 'sa',
password: 'yourStrong(!)Password',
options: { encrypt: true, trustServerCertificate: true }, // Optional
});
const store = new MSSQLStore({
id: 'mssql-storage',
connectionString:
'Server=localhost,1433;Database=mastra;User Id=sa;Password=yourPassword;Encrypt=true;TrustServerCertificate=true',
schemaName: 'custom_schema', // Use custom schema (default: dbo)
options: {
encrypt: true,
trustServerCertificate: true,
connectTimeout: 30000,
requestTimeout: 30000,
pool: {
max: 20,
min: 0,
idleTimeoutMillis: 30000,
},
},
});
// Create a thread
await store.saveThread({
thread: {
id: 'thread-123',
resourceId: 'resource-456',
title: 'My Thread',
metadata: { key: 'value' },
createdAt: new Date(),
updatedAt: new Date(),
},
});
// Add messages to thread
await store.saveMessages({
messages: [
{
id: 'msg-789',
threadId: 'thread-123',
role: 'user',
type: 'text',
content: [{ type: 'text', text: 'Hello' }],
resourceId: 'resource-456',
createdAt: new Date(),
},
],
});
// Query threads and messages
const savedThread = await store.getThreadById({ threadId: 'thread-123' });
const messages = await store.listMessages({ threadId: 'thread-123' });
id: Unique identifier for this store instance (required)MSSQLStore supports multiple connection methods:
Connection String
{
id: 'mssql-storage',
connectionString: 'Server=localhost,1433;Database=mastra;User Id=sa;Password=yourPassword;Encrypt=true;TrustServerCertificate=true';
}
Server/Port/Database
{
id: 'mssql-storage',
server: 'localhost',
port: 1433,
database: 'mastra',
user: 'sa',
password: 'password'
}
schemaName: Custom SQL Server schema (default: dbo)options.encrypt: Enable encryption (default: true)options.trustServerCertificate: Trust self-signed certificates (default: true)options.connectTimeout: Connection timeout in milliseconds (default: 15000)options.requestTimeout: Request timeout in milliseconds (default: 15000)options.pool.max: Maximum pool connections (default: 10)options.pool.min: Minimum pool connections (default: 0)options.pool.idleTimeoutMillis: Idle connection timeout (default: 30000)Thread and Message Management
Resources
Tracing & Observability
Workflow Management
Scoring & Evaluation
Performance & Scalability
Data Management
init(): Initialize the store and create tablesclose(): Close database connection poolsaveThread({ thread }): Create or update a threadgetThreadById({ threadId }): Get a thread by IDupdateThread({ id, title, metadata }): Update thread title and metadatadeleteThread({ threadId }): Delete a thread and its messageslistThreadsByResourceId({ resourceId, offset, limit, orderBy? }): List paginated threads for a resourcesaveMessages({ messages }): Save multiple messages with atomic transactionlistMessagesById({ messageIds }): Get messages by their IDslistMessages({ threadId, resourceId?, page?, perPage?, orderBy?, filter? }): Get paginated messages for a thread with filtering and sortingupdateMessages({ messages }): Update existing messages with atomic transactiondeleteMessages(messageIds): Delete specific messages with atomic transactionsaveResource({ resource }): Save a resource with working memorygetResourceById({ resourceId }): Get a resource by IDupdateResource({ resourceId, workingMemory?, metadata? }): Update resource working memory and metadatacreateSpan(span): Create a trace spanupdateSpan({ spanId, traceId, updates }): Update an existing spangetTrace(traceId): Get complete trace with all spansgetTracesPaginated({ filters?, pagination? }): Query traces with pagination and filtersbatchCreateSpans({ records }): Batch create multiple spansbatchUpdateSpans({ records }): Batch update multiple spansbatchDeleteTraces({ traceIds }): Batch delete tracescreateIndex({ name, table, columns, unique?, where? }): Create a new indexlistIndexes(tableName?): List all indexes or indexes for a specific tabledescribeIndex(indexName): Get detailed index statistics and informationdropIndex(indexName): Drop an existing indexpersistWorkflowSnapshot({ workflowName, runId, resourceId?, snapshot }): Save workflow execution stateloadWorkflowSnapshot({ workflowName, runId }): Load workflow execution stateupdateWorkflowResults({ workflowName, runId, stepId, result, runtimeContext }): Update step results (transaction + row locking)updateWorkflowState({ workflowName, runId, opts }): Update workflow run status (transaction + row locking)listWorkflowRuns({ workflowName?, fromDate?, toDate?, limit?, offset?, resourceId? }): Query workflow runsgetWorkflowRunById({ runId, workflowName? }): Get specific workflow runsaveScore(score): Save evaluation scoregetScoreById({ id }): Get score by IDlistScoresByScorerId({ scorerId, pagination, entityId?, entityType?, source? }): Get scores by scorerlistScoresByRunId({ runId, pagination }): Get scores for a runlistScoresByEntityId({ entityId, entityType, pagination }): Get scores for an entitylistScoresBySpan({ traceId, spanId, pagination }): Get scores for a trace spangetTracesPaginated({ filters?, pagination? }): Get paginated legacy tracesbatchTraceInsert({ records }): Batch insert legacy trace recordsgetEvals({ agentName?, type?, page?, perPage? }): Get paginated evaluationscreateTable({ tableName, schema }): Create a new tablealterTable({ tableName, schema, ifNotExists }): Add columns to existing tableclearTable({ tableName }): Remove all rows from a tabledropTable({ tableName }): Drop a tableinsert({ tableName, record }): Insert a single recordbatchInsert({ tableName, records }): Batch insert multiple recordsload<R>({ tableName, keys }): Load a record by key(s)The MSSQL store provides comprehensive index management capabilities to optimize query performance.
MSSQL storage automatically creates composite indexes during initialization for common query patterns. These indexes significantly improve performance for filtered queries with sorting.
// Basic index for common queries
await store.createIndex({
name: 'idx_threads_resource',
table: 'mastra_threads',
columns: ['resourceId'],
});
// Composite index with sort order for filtering + sorting
await store.createIndex({
name: 'idx_messages_composite',
table: 'mastra_messages',
columns: ['thread_id', 'seq_id DESC'],
});
// Unique index for constraints
await store.createIndex({
name: 'idx_unique_constraint',
table: 'mastra_resources',
columns: ['id'],
unique: true,
});
// Filtered index (partial indexing)
await store.createIndex({
name: 'idx_active_threads',
table: 'mastra_threads',
columns: ['resourceId'],
where: "status = 'active'",
});
// List all indexes
const allIndexes = await store.listIndexes();
// List indexes for specific table
const threadIndexes = await store.listIndexes('mastra_threads');
// Get detailed statistics for an index
const stats = await store.describeIndex('idx_threads_resource');
console.log(stats);
// {
// name: 'idx_threads_resource',
// table: 'mastra_threads',
// columns: ['resourceId', 'seq_id'],
// unique: false,
// size: '128 KB',
// method: 'nonclustered',
// scans: 1542, // Number of index seeks
// tuples_read: 45230, // Tuples read via index
// tuples_fetched: 12050 // Tuples fetched via index
// }
// Drop an index
await store.dropIndex('idx_threads_resource');
// Check index usage statistics
const stats = await store.describeIndex('idx_threads_resource');
// Identify unused indexes
if (stats.scans === 0) {
console.log(`Index ${stats.name} is unused - consider removing`);
await store.dropIndex(stats.name);
}
// Monitor index efficiency
const efficiency = stats.tuples_fetched / stats.tuples_read;
if (efficiency < 0.5) {
console.log(`Index ${stats.name} has low efficiency: ${efficiency}`);
}