data/skills/n8n-workflow-patterns/database_operations.md
Use Case: Read, write, sync, and manage database data in workflows.
Trigger → [Query/Read] → [Transform] → [Write/Update] → [Verify/Log]
Key Characteristic: Data persistence and synchronization
Options:
Supported databases:
Purpose: Map between different database schemas or formats
Typical nodes:
Operations:
Purpose: Confirm operations succeeded
Methods:
Flow: Schedule → Read Source DB → Transform → Write Target DB → Log
Example (Postgres to MySQL sync):
1. Schedule (every 15 minutes)
2. Postgres (SELECT * FROM users WHERE updated_at > {{$json.last_sync}})
3. IF (check if records exist)
4. Set (map Postgres schema to MySQL schema)
5. MySQL (INSERT or UPDATE users)
6. Postgres (UPDATE sync_log SET last_sync = NOW())
7. Slack (notify: "Synced X users")
Incremental sync query:
SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 1000
Parameters:
{
"parameters": [
"={{$node['Get Last Sync'].json.last_sync}}"
]
}
Flow: Extract from multiple sources → Transform → Load into warehouse
Example (Consolidate data):
1. Schedule (daily at 2 AM)
2. [Parallel branches]
├─ Postgres (SELECT orders)
├─ MySQL (SELECT customers)
└─ MongoDB (SELECT products)
3. Merge (combine all data)
4. Code (transform to warehouse schema)
5. Postgres (warehouse - INSERT into fact_sales)
6. Email (send summary report)
Flow: Schedule → Query → Validate → Update/Delete invalid records
Example (Clean orphaned records):
1. Schedule (weekly)
2. Postgres (SELECT users WHERE email IS NULL OR email = '')
3. IF (invalid records exist)
4. Postgres (UPDATE users SET status='inactive' WHERE email IS NULL)
5. Postgres (DELETE FROM users WHERE created_at < NOW() - INTERVAL '1 year' AND status='inactive')
6. Slack (alert: "Cleaned X invalid records")
Flow: Schedule → Query → Export → Store
Example (Archive old records):
1. Schedule (monthly)
2. Postgres (SELECT * FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
3. Code (convert to JSON)
4. Write File (save to archive.json)
5. Google Drive (upload archive)
6. Postgres (DELETE FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
Flow: Webhook → Parse → Update Database
Example (Update user status):
1. Webhook (receive status update)
2. Postgres (UPDATE users SET status = {{$json.body.status}} WHERE id = {{$json.body.user_id}})
3. IF (rows affected > 0)
4. Redis (SET user:{{$json.body.user_id}}:status {{$json.body.status}})
5. Webhook Response ({"success": true})
{
operation: "executeQuery",
query: "SELECT id, name, email FROM users WHERE created_at > $1 LIMIT $2",
parameters: [
"={{$json.since_date}}",
"100"
]
}
{
operation: "insert",
table: "users",
columns: "id, name, email, created_at",
values: [
{
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
created_at: "={{$now}}"
}
]
}
{
operation: "update",
table: "users",
updateKey: "id",
columns: "name, email, updated_at",
values: {
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
updated_at: "={{$now}}"
}
}
{
operation: "executeQuery",
query: `
INSERT INTO users (id, name, email)
VALUES ($1, $2, $3)
ON CONFLICT (id)
DO UPDATE SET name = $2, email = $3, updated_at = NOW()
`,
parameters: [
"={{$json.id}}",
"={{$json.name}}",
"={{$json.email}}"
]
}
{
operation: "executeQuery",
query: `
SELECT u.id, u.name, o.order_id, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > ?
`,
parameters: [
"={{$json.since_date}}"
]
}
{
operation: "insert",
table: "orders",
columns: "user_id, total, status",
values: $json.orders // Array of objects
}
{
operation: "find",
collection: "users",
query: JSON.stringify({
created_at: { $gt: new Date($json.since_date) },
status: "active"
}),
limit: 100
}
{
operation: "insert",
collection: "users",
document: JSON.stringify({
name: $json.name,
email: $json.email,
created_at: new Date()
})
}
{
operation: "update",
collection: "users",
query: JSON.stringify({ _id: $json.user_id }),
update: JSON.stringify({
$set: {
status: $json.status,
updated_at: new Date()
}
})
}
Use when: Processing large datasets to avoid memory issues
Postgres (SELECT 10000 records)
→ Split In Batches (100 items per batch)
→ Transform
→ MySQL (write batch)
→ Loop (until all processed)
Use when: Database has millions of records
Set (initialize: offset=0, limit=1000)
→ Loop Start
→ Postgres (SELECT * FROM large_table LIMIT {{$json.limit}} OFFSET {{$json.offset}})
→ IF (records returned)
├─ Process records
├─ Set (increment offset by 1000)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM large_table
ORDER BY id
LIMIT $1 OFFSET $2
Better performance for large datasets:
Set (initialize: last_id=0)
→ Loop Start
→ Postgres (SELECT * FROM table WHERE id > {{$json.last_id}} ORDER BY id LIMIT 1000)
→ IF (records returned)
├─ Process records
├─ Code (get max id from batch)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM table
WHERE id > $1
ORDER BY id ASC
LIMIT 1000
For databases that support transactions:
// Node 1: Begin Transaction
{
operation: "executeQuery",
query: "BEGIN"
}
// Node 2-N: Your operations
{
operation: "executeQuery",
query: "INSERT INTO ...",
continueOnFail: true
}
// Node N+1: Commit or Rollback
{
operation: "executeQuery",
query: "={{$node['Operation'].json.error ? 'ROLLBACK' : 'COMMIT'}}"
}
Use database features for atomicity:
-- Upsert example (atomic)
INSERT INTO inventory (product_id, quantity)
VALUES ($1, $2)
ON CONFLICT (product_id)
DO UPDATE SET quantity = inventory.quantity + $2
Manual rollback on error:
Try Operations:
Postgres (INSERT orders)
MySQL (INSERT order_items)
Error Trigger:
Postgres (DELETE FROM orders WHERE id = {{$json.order_id}})
MySQL (DELETE FROM order_items WHERE order_id = {{$json.order_id}})
// Code node - map schemas
const sourceData = $input.all();
return sourceData.map(item => ({
json: {
// Source → Target mapping
user_id: item.json.id,
full_name: `${item.json.first_name} ${item.json.last_name}`,
email_address: item.json.email,
registration_date: new Date(item.json.created_at).toISOString(),
// Computed fields
is_premium: item.json.plan_type === 'pro',
// Default values
status: item.json.status || 'active'
}
}));
// Code node - convert data types
return $input.all().map(item => ({
json: {
// String to number
user_id: parseInt(item.json.user_id),
// String to date
created_at: new Date(item.json.created_at),
// Number to boolean
is_active: item.json.active === 1,
// JSON string to object
metadata: JSON.parse(item.json.metadata || '{}'),
// Null handling
email: item.json.email || null
}
}));
// Code node - aggregate data
const items = $input.all();
const summary = items.reduce((acc, item) => {
const date = item.json.created_at.split('T')[0];
if (!acc[date]) {
acc[date] = { count: 0, total: 0 };
}
acc[date].count++;
acc[date].total += item.json.amount;
return acc;
}, {});
return Object.entries(summary).map(([date, data]) => ({
json: {
date,
count: data.count,
total: data.total,
average: data.total / data.count
}
}));
Ensure database has proper indexes:
-- Add index for sync queries
CREATE INDEX idx_users_updated_at ON users(updated_at);
-- Add index for lookups
CREATE INDEX idx_orders_user_id ON orders(user_id);
Always use LIMIT:
-- ✅ Good
SELECT * FROM large_table
WHERE created_at > $1
LIMIT 1000
-- ❌ Bad (unbounded)
SELECT * FROM large_table
WHERE created_at > $1
Parameterized queries are faster:
// ✅ Good - prepared statement
{
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
}
// ❌ Bad - string concatenation
{
query: "SELECT * FROM users WHERE id = '={{$json.id}}'"
}
Write multiple records at once:
// ✅ Good - batch insert
{
operation: "insert",
table: "orders",
values: $json.items // Array of 100 items
}
// ❌ Bad - individual inserts in loop
// 100 separate INSERT statements
Configure in credentials:
{
host: "db.example.com",
database: "mydb",
user: "user",
password: "pass",
// Connection pool settings
min: 2,
max: 10,
idleTimeoutMillis: 30000
}
Database Operation (UPDATE users...)
→ IF ({{$json.rowsAffected === 0}})
└─ Alert: "No rows updated - record not found"
// Database operation with continueOnFail: true
{
operation: "insert",
continueOnFail: true
}
// Next node: Check for errors
IF ({{$json.error !== undefined}})
→ IF ({{$json.error.includes('duplicate key')}})
└─ Log: "Record already exists - skipping"
→ ELSE
└─ Alert: "Database error: {{$json.error}}"
Try Operations:
→ Database Write 1
→ Database Write 2
→ Database Write 3
Error Trigger:
→ Rollback Operations
→ Alert Admin
// ✅ SAFE - parameterized
{
query: "SELECT * FROM users WHERE email = $1",
parameters: ["={{$json.email}}"]
}
// ❌ DANGEROUS - SQL injection risk
{
query: "SELECT * FROM users WHERE email = '={{$json.email}}'"
}
Create dedicated workflow user:
-- ✅ Good - limited permissions
CREATE USER n8n_workflow WITH PASSWORD 'secure_password';
GRANT SELECT, INSERT, UPDATE ON orders TO n8n_workflow;
GRANT SELECT ON users TO n8n_workflow;
-- ❌ Bad - too much access
GRANT ALL PRIVILEGES TO n8n_workflow;
// Code node - validate before write
const email = $json.email;
const name = $json.name;
// Validation
if (!email || !email.includes('@')) {
throw new Error('Invalid email address');
}
if (!name || name.length < 2) {
throw new Error('Invalid name');
}
// Sanitization
return [{
json: {
email: email.toLowerCase().trim(),
name: name.trim()
}
}];
// Code node - encrypt before storage
const crypto = require('crypto');
const algorithm = 'aes-256-cbc';
const key = Buffer.from($credentials.encryptionKey, 'hex');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update($json.sensitive_data, 'utf8', 'hex');
encrypted += cipher.final('hex');
return [{
json: {
encrypted_data: encrypted,
iv: iv.toString('hex')
}
}];
SELECT * FROM large_table -- Could return millions
SELECT * FROM large_table
ORDER BY created_at DESC
LIMIT 1000
query: "SELECT * FROM users WHERE id = '{{$json.id}}'"
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
INSERT into orders
INSERT into order_items // Fails → orphaned order record
BEGIN
INSERT into orders
INSERT into order_items
COMMIT (or ROLLBACK on error)
SELECT 1000000 records → Process all → OOM error
SELECT records → Split In Batches (1000) → Process → Loop
From n8n template library (456 database templates):
Data Sync:
Schedule → Postgres (SELECT new records) → Transform → MySQL (INSERT)
ETL Pipeline:
Schedule → [Multiple DB reads] → Merge → Transform → Warehouse (INSERT)
Backup:
Schedule → Postgres (SELECT all) → JSON → Google Drive (upload)
Use search_templates({query: "database"}) to find more!
Key Points:
Pattern: Trigger → Query → Transform → Write → Verify
Related: