docs/ARCHITECTURE.md
Litestream follows a layered architecture with clear separation of concerns:
graph TB
subgraph "Application Layer"
CLI[CLI Commands
cmd/litestream/]
Config[Configuration
config.go]
end
subgraph "Core Layer"
Store[Store Manager
store.go]
DB[Database Manager
db.go]
Replica[Replica Manager
replica.go]
end
subgraph "Storage Abstraction"
RC[ReplicaClient Interface
replica_client.go]
end
subgraph "Storage Implementations"
S3[s3/replica_client.go]
GCS[gs/replica_client.go]
ABS[abs/replica_client.go]
OSS[oss/replica_client.go]
File[file/replica_client.go]
SFTP[sftp/replica_client.go]
NATS[nats/replica_client.go]
end
subgraph "External"
SQLite[SQLite Database]
Cloud[Cloud Storage]
end
CLI --> Store
Store --> DB
DB --> Replica
Replica --> RC
RC --> S3
RC --> GCS
RC --> ABS
RC --> OSS
RC --> File
RC --> SFTP
RC --> NATS
DB <--> SQLite
S3 --> Cloud
GCS --> Cloud
ABS --> Cloud
OSS --> Cloud
server.go)leaser.go, s3/leaser.go)The DB component is the heart of Litestream, managing a single SQLite database:
type DB struct {
// Core fields
path string // Database file path
metaPath string // Metadata directory path
db *sql.DB // SQLite connection
f *os.File // Long-running file descriptor
rtx *sql.Tx // Long-running read transaction
pageSize int // Database page size
// Synchronization
mu sync.RWMutex // Protects struct fields
chkMu sync.RWMutex // Checkpoint lock
notify chan struct{} // WAL change notifications
// Lifecycle
ctx context.Context
cancel func()
wg sync.WaitGroup
// Configuration
MinCheckpointPageN int // Min pages for passive checkpoint
TruncatePageN int // Pages before emergency truncate checkpoint
CheckpointInterval time.Duration // Time-based passive checkpoint interval
MonitorInterval time.Duration // WAL monitoring frequency
// Note: MaxCheckpointPageN removed (RESTART mode disabled due to #724)
// Metrics
dbSizeGauge prometheus.Gauge
walSizeGauge prometheus.Gauge
txIDGauge prometheus.Gauge
}
// Lifecycle
func (db *DB) Open() error
func (db *DB) Close(ctx context.Context) error
// Monitoring
func (db *DB) monitor() // Background WAL monitoring
func (db *DB) checkWAL() (bool, error) // Check for WAL changes
// Checkpointing
func (db *DB) Checkpoint(mode string) error
func (db *DB) autoCheckpoint() error
// Replication
func (db *DB) WALReader(pgno uint32) (io.ReadCloser, error)
func (db *DB) Sync(ctx context.Context) error
// Compaction
func (db *DB) Compact(ctx context.Context, destLevel int) (*ltx.FileInfo, error)
Manages replication to a single destination:
type Replica struct {
db *DB // Parent database
Client ReplicaClient // Storage backend client
mu sync.RWMutex
pos ltx.Pos // Current replication position
// Configuration
SyncInterval time.Duration
MonitorEnabled bool
// Lifecycle
cancel func()
wg sync.WaitGroup
}
type Pos struct {
TXID TXID // Transaction ID
PageNo uint32 // Page number within transaction
Checksum uint64 // Running checksum
}
Coordinates multiple databases and manages system-wide resources:
type Store struct {
mu sync.Mutex
dbs []*DB
levels CompactionLevels
// Configuration
SnapshotInterval time.Duration
SnapshotRetention time.Duration
L0Retention time.Duration
L0RetentionCheckInterval time.Duration
CompactionMonitorEnabled bool
// Lifecycle
ctx context.Context
cancel func()
wg sync.WaitGroup
}
Litestream exposes a Unix socket HTTP server for runtime control (server.go).
type SocketConfig struct {
Enabled bool `yaml:"enabled"` // Default: false
Path string `yaml:"path"` // Default: "/var/run/litestream.sock"
Permissions uint32 `yaml:"permissions"` // Default: 0600
}
All endpoints are served over the Unix socket via Go's net/http mux:
| Method | Path | Description |
|---|---|---|
POST | /register | Add a database at runtime |
POST | /unregister | Remove a database at runtime |
GET | /txid?path= | Get current transaction ID for a database |
GET | /list | List all managed databases |
GET | /info | Server info (version, PID, uptime, database count) |
POST | /start | Start replication for a database |
POST | /stop | Stop replication for a database |
GET | /debug/pprof/* | Standard Go pprof endpoints |
type RegisterDatabaseRequest struct {
Path string `json:"path"`
ReplicaURL string `json:"replica_url"`
}
type UnregisterDatabaseRequest struct {
Path string `json:"path"`
Timeout int `json:"timeout,omitempty"` // Seconds
}
type TXIDResponse struct {
TXID uint64 `json:"txid"`
}
The Leaser interface (leaser.go) enables distributed coordination so multiple Litestream instances can safely share a replica destination.
type Leaser interface {
Type() string
AcquireLease(ctx context.Context) (*Lease, error)
RenewLease(ctx context.Context, lease *Lease) (*Lease, error)
ReleaseLease(ctx context.Context, lease *Lease) error
}
type Lease struct {
Generation int64 `json:"generation"`
ExpiresAt time.Time `json:"expires_at"`
Owner string `json:"owner,omitempty"`
ETag string `json:"-"`
}
s3/leaser.go)DefaultLeaseTTL = 30s, DefaultLeasePath = "lock.json"hostname:pid (falls back to pid-N if hostname unavailable)If-Match/If-None-Match on S3 PutObject to prevent racesDeleteObject with If-Match to ensure only the holder can releaseErrLeaseNotHeld, ErrLeaseAlreadyReleased, LeaseExistsError{Owner, ExpiresAt}lock.json)LeaseExistsErrorIf-None-Match: * (first acquire) or If-Match: <etag> (expired takeover)PreconditionFailed (412), another instance acquired firstThese methods on DB (db.go) simplify common operations when Litestream is used as a library:
type SyncStatus struct {
LocalTXID ltx.TXID // Local transaction ID
RemoteTXID ltx.TXID // Remote transaction ID
InSync bool // true if LocalTXID > 0 and equal to RemoteTXID
}
func (db *DB) SyncStatus(ctx context.Context) (SyncStatus, error)
func (db *DB) SyncAndWait(ctx context.Context) error
func (db *DB) EnsureExists(ctx context.Context) error
db.Sync() (WAL to LTX) then db.Replica.Sync() (LTX to remote), blocking until both completeOpen()LTX (Log Transaction) files are immutable files containing database changes:
+------------------+
| Header | Fixed size header with metadata
+------------------+
| |
| Page Frames | Variable number of page frames
| |
+------------------+
| Page Index | Index for efficient page lookup
+------------------+
| Trailer | Metadata and checksums
+------------------+
type Header struct {
Magic [4]byte // "LTX\x00"
Version uint32 // Format version
PageSize uint32 // Database page size
MinTXID TXID // Starting transaction ID
MaxTXID TXID // Ending transaction ID
Timestamp int64 // Creation timestamp
Checksum uint64 // Header checksum
}
type PageFrame struct {
Header PageHeader
Data []byte // Page data (pageSize bytes)
}
type PageHeader struct {
PageNo uint32 // Page number in database
Size uint32 // Size of page data
Checksum uint64 // Page checksum
}
Binary search tree for efficient page lookup:
type PageIndexElem struct {
PageNo uint32 // Page number
Offset int64 // Offset in file
Size uint32 // Size of page frame
}
type Trailer struct {
PageIndexOffset int64 // Offset to page index
PageIndexSize int64 // Size of page index
PageCount uint32 // Total pages in file
Checksum uint64 // Full file checksum
}
func (db *DB) monitor() {
ticker := time.NewTicker(db.MonitorInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Check WAL for changes
changed, err := db.checkWAL()
if err != nil {
slog.Error("wal check failed", "error", err)
continue
}
if changed {
// Notify replicas of changes
db.notifyReplicas()
// Check if checkpoint needed
if db.shouldCheckpoint() {
db.autoCheckpoint()
}
}
case <-db.ctx.Done():
return
}
}
}
func (db *DB) checkWAL() (bool, error) {
// Get current WAL state
walInfo, err := db.walInfo()
if err != nil {
return false, err
}
// Compare with previous state
db.mu.Lock()
changed := walInfo.Size != db.prevWALSize ||
walInfo.Checksum != db.prevWALChecksum
db.prevWALSize = walInfo.Size
db.prevWALChecksum = walInfo.Checksum
db.mu.Unlock()
return changed, nil
}
Compaction merges multiple LTX files to reduce storage overhead:
High-level compaction flow:
Store.shouldCompact).L-1 files using ReplicaClient.LTXFiles, preferring local
copies via os.Open(db.LTXPath(...)) and falling back to
ReplicaClient.OpenLTXFile only when necessary.ltx.NewCompactor, which performs
page-level deduplication and enforces lock-page skipping automatically.ReplicaClient.WriteLTXFile to create the
merged LTX file for level L.ltx.FileInfo.CreatedAt to the earliest timestamp from
the source files so point-in-time recovery remains accurate.type CompactionLevel struct {
Level int // Level number (0 = raw, 1+ = compacted)
Interval time.Duration // How often to compact from previous level
}
// Default configuration
var DefaultCompactionLevels = CompactionLevels{
{Level: 0, Interval: 0}, // Raw LTX files
{Level: 1, Interval: 1 * Hour}, // Hourly compaction
{Level: 2, Interval: 24 * Hour}, // Daily compaction
}
Litestream maintains a long-running read transaction to ensure consistency:
func (db *DB) initReadTx() error {
// Start read transaction
tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{
ReadOnly: true,
})
if err != nil {
return err
}
// Execute dummy query to start transaction
var dummy string
err = tx.QueryRow("SELECT ''").Scan(&dummy)
if err != nil {
tx.Rollback()
return err
}
db.rtx = tx
return nil
}
Purpose:
func (db *DB) Checkpoint(mode string) error {
// Acquire checkpoint lock
db.chkMu.Lock()
defer db.chkMu.Unlock()
// Close read transaction temporarily
if db.rtx != nil {
db.rtx.Rollback()
db.rtx = nil
}
// Perform checkpoint
_, _, err := db.db.Exec(fmt.Sprintf("PRAGMA wal_checkpoint(%s)", mode))
if err != nil {
return err
}
// Restart read transaction
return db.initReadTx()
}
// DB struct mutexes
type DB struct {
mu sync.RWMutex // Protects struct fields
chkMu sync.RWMutex // Checkpoint coordination
}
// Replica struct mutexes
type Replica struct {
mu sync.RWMutex // Protects position
muf sync.Mutex // File descriptor lock
}
// Store struct mutex
type Store struct {
mu sync.Mutex // Protects database list
}
Store.Open() (store.go) limits concurrent database opens at startup:
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(50) // Max 50 concurrent DB opens
for _, db := range s.dbs {
db := db
g.Go(func() error { return db.Open() })
}
This prevents OS resource exhaustion (file descriptors, memory) when hundreds of databases are configured.
Always acquire locks in this order:
// Start background task
func (db *DB) Start() {
db.wg.Add(1)
go func() {
defer db.wg.Done()
db.monitor()
}()
}
// Stop with timeout
func (db *DB) Close(ctx context.Context) error {
// Signal shutdown
db.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
db.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
stateDiagram-v2
[*] --> Closed
Closed --> Opening: Open()
Opening --> Open: Success
Opening --> Closed: Error
Open --> Monitoring: Start()
Monitoring --> Syncing: Changes Detected
Syncing --> Monitoring: Sync Complete
Monitoring --> Checkpointing: Threshold Reached
Checkpointing --> Monitoring: Checkpoint Complete
Monitoring --> Closing: Close()
Closing --> Closed: Cleanup Complete
stateDiagram-v2
[*] --> Idle
Idle --> Starting: Start()
Starting --> Monitoring: Success
Starting --> Idle: Error
Monitoring --> Syncing: Timer/Changes
Syncing --> Uploading: Have Changes
Uploading --> Monitoring: Success
Uploading --> Error: Failed
Error --> Monitoring: Retry
Monitoring --> Stopping: Stop()
Stopping --> Idle: Cleanup
type Pos struct {
TXID TXID // Current transaction ID
PageNo uint32 // Current page number
Checksum uint64 // Running checksum for validation
}
// Update position atomically
func (r *Replica) SetPos(pos ltx.Pos) {
r.mu.Lock() // MUST use Lock, not RLock!
defer r.mu.Unlock()
r.pos = pos
}
// Read position safely
func (r *Replica) Pos() ltx.Pos {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pos
}
sequenceDiagram
participant Main
participant Store
participant DB
participant Replica
participant Monitor
Main->>Store: NewStore(config)
Store->>Store: Validate config
Main->>Store: Open()
loop For each database
Store->>DB: NewDB(path)
Store->>DB: Open()
DB->>DB: Open SQLite connection
DB->>DB: Read page size
DB->>DB: Init metadata
DB->>DB: Start read transaction
loop For each replica
DB->>Replica: NewReplica()
DB->>Replica: Start()
Replica->>Monitor: Start monitoring
end
end
Store->>Store: Start compaction monitors
Store-->>Main: Ready
Database Opening
// Must happen in order:
1. Open SQLite connection
2. Read page size (PRAGMA page_size)
3. Create metadata directory
4. Start long-running read transaction
5. Initialize replicas
6. Start monitor goroutine
Replica Initialization
// Must happen in order:
1. Create replica with client
2. Load previous position from metadata
3. Validate position against database
4. Start sync goroutine (if monitoring enabled)
Recoverable Errors
Fatal Errors
Operational Errors
// Bottom-up error propagation
ReplicaClient.WriteLTXFile() error
↓
Replica.Sync() error
↓
DB.Sync() error
↓
Store.monitorDB() // Logs error, continues
func (r *Replica) syncWithRetry(ctx context.Context) error {
backoff := time.Second
maxBackoff := time.Minute
for attempt := 0; ; attempt++ {
err := r.Sync(ctx)
if err == nil {
return nil
}
// Check if error is retryable
if !isRetryable(err) {
return err
}
// Check context
if ctx.Err() != nil {
return ctx.Err()
}
// Exponential backoff
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
| Operation | Complexity | Notes |
|---|---|---|
| WAL Monitor | O(1) | Fixed interval check |
| Page Write | O(1) | Append to LTX file |
| Compaction | O(n) | n = total pages |
| Restoration | O(n*log(m)) | n = pages, m = files |
| File List | O(k) | k = files in level |
| Component | Memory Usage | Disk Usage |
|---|---|---|
| DB | O(1) + metrics | Original DB + WAL |
| Replica | O(1) | LTX files + metadata |
| Compaction | O(n) pages | Temporary during merge |
| Page Index | O(p) | p = pages in file |
Page Index Caching
Batch Operations
Concurrent Operations
// Database metrics
db_size_bytes // Current database size
wal_size_bytes // Current WAL size
total_wal_bytes // Total bytes written to WAL
checkpoint_count // Number of checkpoints
sync_count // Number of syncs
sync_error_count // Number of sync errors
// Replica metrics
replica_lag_seconds // Replication lag
replica_position // Current replication position
func (db *DB) HealthCheck() error {
// Check database connection
if err := db.db.Ping(); err != nil {
return fmt.Errorf("database ping failed: %w", err)
}
// Check replication lag
for _, r := range db.replicas {
lag := time.Since(r.LastSync())
if lag > MaxAcceptableLag {
return fmt.Errorf("replica %s lag too high: %v", r.Name(), lag)
}
}
return nil
}