docs/PATTERNS.md
This document contains detailed code patterns, examples, and anti-patterns for working with Litestream. For a quick overview, see AGENTS.md.
DB Layer (db.go) → Database state, restoration, monitoring
Replica Layer (replica.go) → Replication mechanics only
Storage Layer → ReplicaClient implementations
Database restoration logic belongs in the DB layer, not the Replica layer.
When the database is behind the replica (local TXID < remote TXID):
// CORRECT - DB layer handles database state
func (db *DB) init() error {
// DB layer handles database state
if db.needsRestore() {
if err := db.restore(); err != nil {
return err
}
}
// Then start replica for replication only
return db.replica.Start()
}
func (r *Replica) Start() error {
// Replica focuses only on replication
return r.startSync()
}
Reference: DB.checkDatabaseBehindReplica() in db.go:670-737
// WRONG - Replica should only handle replication concerns
func (r *Replica) Start() error {
// DON'T check database state here
if needsRestore() { // Wrong layer!
restoreDatabase() // Wrong layer!
}
// Replica should focus only on replication mechanics
}
Always use atomic writes to prevent partial/corrupted files.
// CORRECT - Atomic file write pattern
func writeFileAtomic(path string, data []byte) error {
// Create temp file in same directory (for atomic rename)
dir := filepath.Dir(path)
tmpFile, err := os.CreateTemp(dir, ".tmp-*")
if err != nil {
return fmt.Errorf("create temp file: %w", err)
}
tmpPath := tmpFile.Name()
// Clean up temp file on error
defer func() {
if tmpFile != nil {
tmpFile.Close()
os.Remove(tmpPath)
}
}()
// Write data to temp file
if _, err := tmpFile.Write(data); err != nil {
return fmt.Errorf("write temp file: %w", err)
}
// Sync to ensure data is on disk
if err := tmpFile.Sync(); err != nil {
return fmt.Errorf("sync temp file: %w", err)
}
// Close before rename
if err := tmpFile.Close(); err != nil {
return fmt.Errorf("close temp file: %w", err)
}
tmpFile = nil // Prevent defer cleanup
// Atomic rename (on same filesystem)
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath)
return fmt.Errorf("rename to final path: %w", err)
}
return nil
}
// WRONG - Can leave partial files on failure
func writeFileDirect(path string, data []byte) error {
return os.WriteFile(path, data, 0644) // Not atomic!
}
When you handle an error, ask: "Does the caller need to know about this failure?"
Yes → return the error. This is the default for virtually all cases in Litestream.
No → DEBUG log only. This is rare. Only when ALL of these are true:
When in doubt, return the error. In a disaster recovery tool, silent failures are worse than noisy ones.
// CORRECT - Return error for caller to handle
func (db *DB) validatePosition() error {
dpos, err := db.Pos()
if err != nil {
return err
}
rpos := replica.Pos()
if dpos.TXID < rpos.TXID {
return fmt.Errorf("database position (%v) behind replica (%v)", dpos, rpos)
}
return nil
}
// WRONG - Silently continuing can cause data corruption
func (db *DB) validatePosition() {
if dpos, _ := db.Pos(); dpos.TXID < replica.Pos().TXID {
log.Printf("warning: position mismatch") // Don't just log!
// Continuing here is dangerous
}
}
// WRONG - Continuing after error can corrupt state
func (db *DB) processFiles() {
for _, file := range files {
if err := processFile(file); err != nil {
log.Printf("error: %v", err) // Just logging!
// Continuing to next file is dangerous
}
}
}
// CORRECT - Let caller decide how to handle errors
func (db *DB) processFiles() error {
for _, file := range files {
if err := processFile(file); err != nil {
return fmt.Errorf("process file %s: %w", file, err)
}
}
return nil
}
// CORRECT - Use Lock() for writes
r.mu.Lock()
defer r.mu.Unlock()
r.pos = pos
// WRONG - Race condition
r.mu.RLock() // Should be Lock() for writes
defer r.mu.RUnlock()
r.pos = pos // Writing with RLock!
Many storage backends (S3, R2, etc.) are eventually consistent:
// CORRECT - Check local first during compaction
// db.go:1280-1294 - ALWAYS read from local disk when available
f, err := os.Open(db.LTXPath(info.Level, info.MinTXID, info.MaxTXID))
if err == nil {
// Use local file - it's complete and consistent
return f, nil
}
// Only fall back to remote if local doesn't exist
return replica.Client.OpenLTXFile(...)
// WRONG - Can get partial/corrupt data from eventually consistent storage
f, err := client.OpenLTXFile(ctx, level, minTXID, maxTXID, 0, 0)
During restore, LTX file streams from S3/Tigris may sit idle while the compactor processes lower-numbered pages from the snapshot. Storage providers close these idle connections, causing "unexpected EOF" errors.
The ResumableReader (internal/resumable_reader.go) wraps io.ReadCloser with automatic reconnection:
type LTXFileOpener interface {
OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
}
offset < size (known file size)resumableReaderMaxRetries = 3)offset for range-request resumeio.ReadFull naturally retryOpenLTXFile(ctx, level, min, max, offset, 0)rc, _ := client.OpenLTXFile(ctx, level, min, max, 0, fileInfo.Size)
rr := internal.NewResumableReader(ctx, client, level, min, max, fileInfo.Size, rc, logger)
defer rr.Close()
rc, _ := client.OpenLTXFile(ctx, level, min, max, 0, 0)
// Risk: connection may drop during idle periods in multi-file restore
io.ReadFull(rc, buf) // unexpected EOF!
When using cloud provider lifecycle policies (S3 lifecycle rules, R2 auto-cleanup), Litestream's active file deletion can be disabled:
retention:
enabled: false
RetentionConfig{Enabled *bool} in YAML config (cmd/litestream/main.go)Store.SetRetentionEnabled(bool) propagates to all DBs and their compactors (store.go)Compactor.RetentionEnabled guards 3 deletion points in compactor.gostore.SetRetentionEnabled(false) // Delegates deletion to cloud lifecycle policies
Disabling retention without cloud lifecycle policies causes unbounded storage growth. Litestream logs a warning: "retention disabled; cloud provider lifecycle policies must handle retention".
The S3 leaser (s3/leaser.go) uses S3 conditional writes (If-Match/If-None-Match) for distributed locking without an external coordination service.
input := &s3.PutObjectInput{
Bucket: aws.String(l.Bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
}
if etag == "" {
input.IfNoneMatch = aws.String("*") // First acquire: only if key doesn't exist
} else {
input.IfMatch = aws.String(etag) // Expired takeover: only if ETag matches
}
_, err := l.s3.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(l.Bucket),
Key: aws.String(key),
IfMatch: aws.String(lease.ETag), // Only delete if we still hold the lease
})
During compaction, preserve the earliest CreatedAt timestamp from source files to maintain temporal granularity for point-in-time restoration.
// CORRECT - Preserve temporal information
info, err := replica.Client.WriteLTXFile(ctx, level, minTXID, maxTXID, r)
if err != nil {
return fmt.Errorf("write ltx: %w", err)
}
info.CreatedAt = oldestSourceFile.CreatedAt
// WRONG - Loses timestamp granularity for point-in-time restores
info := <x.FileInfo{
CreatedAt: time.Now(), // Don't use current time during compaction
}
// WRONG - Database state logic in Replica layer
func (r *Replica) Start() error {
if db.needsRestore() { // Wrong layer for DB state!
r.restoreDatabase() // Replica shouldn't manage DB state!
}
return r.sync()
}
// WRONG - Don't reimplement what already exists
func customSnapshotTrigger() {
// Complex custom logic to trigger snapshots
// when db.verify() already does this!
}
// CORRECT - Use what's already there
func triggerSnapshot() error {
return db.verify() // Already handles snapshot logic correctly
}
The lock page at 1GB (0x40000000) must always be skipped:
// db.go:951-953 - Must skip lock page during replication
lockPgno := ltx.LockPgno(pageSize)
if pgno == lockPgno {
continue // Skip this page - it's reserved by SQLite
}
Lock page numbers by page size:
| Page Size | Lock Page Number |
|---|---|
| 4KB | 262145 |
| 8KB | 131073 |
| 16KB | 65537 |
| 32KB | 32769 |
Responsibilities:
modernc.org/sqlite - no CGO)Key Fields:
type DB struct {
path string // Database file path
db *sql.DB // SQLite connection
rtx *sql.Tx // Long-running read transaction
pageSize int // Database page size (critical for lock page)
notify chan struct{} // Notifies on WAL changes
}
Initialization Sequence:
Responsibilities:
Key Operations:
Sync(): Synchronizes pending changesSetPos(): Updates replication position (must use Lock, not RLock!)Snapshot(): Creates full database snapshotRequired Methods:
type ReplicaClient interface {
Type() string // Client type identifier
// File operations
LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
DeleteLTXFiles(ctx context.Context, files []*ltx.FileInfo) error
DeleteAll(ctx context.Context) error
}
useMetadata Parameter:
useMetadata=true: Fetch accurate timestamps from backend metadata (required for point-in-time restores)useMetadata=false: Use fast timestamps for normal operationsResponsibilities:
ReplicaClient interfaceKey Fields:
type Compactor struct {
client ReplicaClient
VerifyCompaction bool // Post-compaction TXID consistency check
RetentionEnabled bool // Default: true. Controls active file deletion
// Local file optimization (set by DB layer)
LocalFileOpener func(level int, minTXID, maxTXID ltx.TXID) (io.ReadCloser, error)
LocalFileDeleter func(level int, minTXID, maxTXID ltx.TXID) error
// Level max-file-info caching
CacheGetter func(level int) (*ltx.FileInfo, bool)
CacheSetter func(level int, info *ltx.FileInfo)
}
Default Compaction Levels:
var defaultLevels = CompactionLevels{
{Level: 0, Interval: 0}, // Raw LTX files (no compaction)
{Level: 1, Interval: 30*Second},
{Level: 2, Interval: 5*Minute},
{Level: 3, Interval: 1*Hour},
// Snapshots created daily (24h retention)
}
# Always run with race detector
go test -race -v ./...
# Specific race-prone areas
go test -race -v -run TestReplica_Sync ./...
go test -race -v -run TestDB_Sync ./...
go test -race -v -run TestStore_CompactDB ./...
# Test with various page sizes
./bin/litestream-test populate -db test.db -page-size 4096 -target-size 2GB
./bin/litestream-test populate -db test.db -page-size 8192 -target-size 2GB
# Validate lock page handling
./bin/litestream-test validate -source-db test.db -replica-url file:///tmp/replica
# Test specific backend
go test -v ./replica_client_test.go -integration s3
go test -v ./replica_client_test.go -integration gcs
go test -v ./replica_client_test.go -integration abs
go test -v ./replica_client_test.go -integration oss
go test -v ./replica_client_test.go -integration sftp