src/backend/base/langflow/alembic/DB-MIGRATION-GUIDE.MD
This guide outlines our approach to database migrations in a multi-service architecture where multiple services with different versions share the same database. We follow the Expand-Contract Pattern to ensure zero-downtime deployments and maintain N-1 version compatibility.
The Expand-Contract pattern consists of three phases:
flowchart LR
A[Current Schema]
B["**EXPAND: Add New Schema**"]
C["**MIGRATE: Transition Data**"]
D["**CONTRACT: Remove Old Schema**"]
A --> B
B --> C
C --> D
%% basic neutral colors
style B fill:#f2f2f2,stroke:#333,stroke-width:1.5px,color:#000
style C fill:#d9eaff,stroke:#333,stroke-width:1.5px,color:#000
style D fill:#d5f5d5,stroke:#333,stroke-width:1.5px,color:#000
Diagram Description: The Expand-Contract migration pattern consists of three phases:
Considering a N days Contract cycle
| Phase | Duration | Description |
|---|---|---|
| Expand | Day 1 | Add new schema elements (backward compatible) |
| Migrate | Days 2-N | Update services, backfill data, monitor adoption |
| Contract | Day N+1 | Remove deprecated schema (only after full adoption) |
Goal: Add new schema elements without breaking existing services
def upgrade() -> None:
"""
EXPAND PHASE: Add new schema elements as nullable/optional
"""
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('table_name')]
# Always check existence to ensure idempotency
if 'new_column' not in columns:
# CRITICAL: Use nullable=True for backward compatibility
op.add_column('table_name', sa.Column('new_column', sa.String(), nullable=True))
# Optional: Add index for performance
op.create_index('ix_table_new_column', 'table_name', ['new_column'])
Service Compatibility:
Goal: Gradually transition all services and data to use new schema
Database Operations:
-- Backfill existing data if needed
UPDATE table_name
SET new_column = old_column
WHERE new_column IS NULL AND old_column IS NOT NULL;
-- Monitor adoption
SELECT
COUNT(*) FILTER (WHERE new_column IS NOT NULL) as using_new,
COUNT(*) FILTER (WHERE new_column IS NULL) as not_using_new
FROM table_name
WHERE created_at > NOW() - INTERVAL '1 day';
Service Code Pattern:
class ServiceAdapter:
def read_data(self, row):
"""Handle both old and new schema"""
return {
'id': row.id,
# Gracefully handle missing columns
'new_field': getattr(row, 'new_column', None),
# Fallback to old column if new doesn't exist
'data': row.new_column or row.old_column
}
def write_data(self, data):
"""Write to both old and new schema during transition"""
return Model(
old_column=data, # Keep writing to old column
new_column=data # Also write to new column
)
Goal: Remove deprecated schema elements after all services have migrated
Prerequisites Checklist:
def upgrade_contract_phase() -> None:
"""
CONTRACT PHASE: Remove old schema (only after full migration)
"""
# Verify no services are using old column
bind = op.get_bind()
# Safety check: Log usage before removal
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM table_name
WHERE old_column IS NOT NULL AND new_column IS NULL
""")).first()
if result and result.cnt > 0:
raise Exception(f"Cannot contract: {result.cnt} rows still depend on old_column")
# Safe to remove
op.drop_column('table_name', 'old_column')
✅ DO:
# Always nullable for new columns
op.add_column('message', sa.Column('context_id', sa.String(), nullable=True))
# Add default value if needed (database-level)
op.add_column('message', sa.Column('status', sa.String(),
nullable=True, server_default='pending'))
❌ DON'T:
# Never add required columns directly
op.add_column('message', sa.Column('context_id', sa.String(), nullable=False))
✅ DO:
❌ DON'T:
✅ DO:
❌ DON'T:
# Never rename directly - breaks old services
op.alter_column('table_name', 'old_name', new_column_name='new_name')
✅ DO:
def pre_migration_checks():
"""Run before applying migration"""
bind = op.get_bind()
# Check table size for performance impact
result = bind.execute(sa.text(
"SELECT COUNT(*) as cnt FROM table_name"
)).first()
if result.cnt > 1000000:
print(f"WARNING: Large table ({result.cnt} rows) - migration may be slow")
# Check for running transactions
result = bind.execute(sa.text("""
SELECT COUNT(*) FROM pg_stat_activity
WHERE state = 'active' AND query_start < NOW() - INTERVAL '5 minutes'
""")).first()
if result[0] > 0:
print(f"WARNING: {result[0]} long-running transactions detected")
def post_migration_validation():
"""Verify migration succeeded"""
bind = op.get_bind()
inspector = sa.inspect(bind)
# Verify column exists
columns = [col['name'] for col in inspector.get_columns('table_name')]
assert 'new_column' in columns, "Migration failed: column not added"
# Verify data integrity
result = bind.execute(sa.text("""
SELECT COUNT(*) FROM table_name
WHERE new_column IS NOT NULL
""")).first()
print(f"Migration complete: {result[0]} rows have new_column populated")
Before rolling back, verify:
def downgrade() -> None:
"""
Safe rollback with data preservation
"""
bind = op.get_bind()
inspector = sa.inspect(bind)
# Check for data that would be lost
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM table_name
WHERE new_column IS NOT NULL
""")).first()
if result and result.cnt > 0:
# Backup data before dropping
bind.execute(sa.text("""
CREATE TABLE IF NOT EXISTS table_name_backup AS
SELECT id, new_column, NOW() as backed_up_at
FROM table_name WHERE new_column IS NOT NULL
"""))
print(f"Backed up {result.cnt} rows to table_name_backup")
# Safe to drop column
columns = [col['name'] for col in inspector.get_columns('table_name')]
if 'new_column' in columns:
op.drop_column('table_name', 'new_column')
# Check existence before adding/dropping
if 'column_name' not in columns:
op.add_column(...)
if 'column_name' in columns:
op.drop_column(...)
"""
Migration: Add context_id to message table
Phase: EXPAND
Safe to rollback: YES
Services compatible: All versions
Next phase: MIGRATE after all services deployed
Revision ID: 182e5471b900
"""
class MessageService:
def process_message(self, message):
if feature_flags.is_enabled('use_context_id'):
# New logic using context_id
return self._process_with_context(message)
else:
# Old logic without context_id
return self._process_legacy(message)
-- Create monitoring view
CREATE VIEW migration_progress AS
SELECT
'context_id_adoption' as migration,
COUNT(*) FILTER (WHERE context_id IS NOT NULL) * 100.0 / COUNT(*) as percentage_complete,
COUNT(*) as total_records,
MAX(updated_at) as last_update
FROM message
WHERE created_at > NOW() - INTERVAL '7 days';
# DON'T: This breaks existing services
op.alter_column('message', 'content', nullable=False)
# DON'T: Remove old schema in same migration as adding new
def upgrade():
op.add_column('table', sa.Column('new_col', ...))
op.drop_column('table', 'old_col') # Services still using this!
# DON'T: Direct type change can fail or corrupt data
op.alter_column('table', 'amount', type_=sa.Integer()) # Was String
# DON'T: Assume all services update simultaneously
if datetime.now() > deployment_date:
op.drop_column('table', 'old_column') # Some services might be delayed!
# Migration 1: EXPAND (Day 1)
def upgrade_expand():
op.add_column('user', sa.Column('email_verified', sa.Boolean(),
nullable=True, server_default='false'))
# Migration 2: MIGRATE (Day 30, after all services updated)
def upgrade_migrate():
# Backfill any NULL values
op.execute("UPDATE user SET email_verified = false WHERE email_verified IS NULL")
# Migration 3: CONTRACT (Day 60, after verification)
def upgrade_contract():
op.alter_column('user', 'email_verified', nullable=False)
# Migration 1: Add new column
def upgrade_phase1():
op.add_column('order', sa.Column('status_code', sa.Integer(), nullable=True))
# Copy data from old column
op.execute("""
UPDATE order SET status_code =
CASE status_text
WHEN 'pending' THEN 1
WHEN 'processing' THEN 2
WHEN 'complete' THEN 3
ELSE 0
END
""")
# Migration 2: Remove old column (after transition period)
def upgrade_phase2():
op.drop_column('order', 'status_text')
-- Track which services are using new schema
CREATE TABLE service_schema_usage (
service_name VARCHAR(100),
schema_version VARCHAR(50),
last_seen TIMESTAMP DEFAULT NOW(),
uses_new_schema BOOLEAN DEFAULT FALSE
);
-- Update from application
INSERT INTO service_schema_usage (service_name, schema_version, uses_new_schema)
VALUES ('user-service', 'v2.1.0', true)
ON CONFLICT (service_name)
DO UPDATE SET
schema_version = EXCLUDED.schema_version,
last_seen = NOW(),
uses_new_schema = EXCLUDED.uses_new_schema;
-- Check migration adoption rate
SELECT
migration_name,
adopted_services,
total_services,
(adopted_services * 100.0 / total_services) as adoption_percentage,
days_since_deployment
FROM migration_tracking
WHERE is_active = true
ORDER BY days_since_deployment DESC;
```sql
-- Identify services not yet migrated
SELECT
s.service_name,
s.version,
s.last_deployment,
m.migration_name
FROM services s
CROSS JOIN active_migrations m
WHERE NOT EXISTS (
SELECT 1 FROM service_migrations sm
WHERE sm.service_name = s.service_name
AND sm.migration_name = m.migration_name
);
Following the Expand-Contract pattern ensures:
Remember: When in doubt, expand first, migrate slowly, and contract only when certain.