.tasks/core/LSYNC-022-sync-metrics-and-observability.md
The sync system operates as a black box with no visibility into its behavior, performance, or health. This makes it impossible to:
Users and developers need comprehensive metrics to understand what the sync system is doing, when it's active, how much data has been synced, and where problems occur.
┌─────────────────────────────────────────┐
│ Sync Components │
│ (PeerSync, BackfillManager, etc.) │
└──────────────┬──────────────────────────┘
│ Record metrics
▼
┌─────────────────────────────────────────┐
│ SyncMetricsCollector │
│ - Aggregates metrics from all sources │
│ - Maintains atomic counters │
│ - Tracks time-series data │
└──────────────┬──────────────────────────┘
│ Query
▼
┌─────────────────────────────────────────┐
│ Metrics Storage │
│ - In-memory ring buffer (last N) │
│ - Persistent snapshots (database) │
│ - Real-time atomic counters │
└──────────────┬──────────────────────────┘
│ Export
▼
┌─────────────────────────────────────────┐
│ Query Interfaces │
│ - CLI: `sd sync metrics` │
│ - API: GraphQL/REST endpoint │
│ - Events: Real-time updates │
└─────────────────────────────────────────┘
struct SyncStateMetrics {
// Current state
current_state: SyncState, // Uninitialized, Backfilling, CatchingUp, Ready
state_entered_at: DateTime<Utc>,
// State history (last N transitions)
state_history: VecDeque<StateTransition>,
total_time_in_state: HashMap<SyncState, Duration>,
transition_count: HashMap<(SyncState, SyncState), u64>,
}
struct StateTransition {
from: SyncState,
to: SyncState,
timestamp: DateTime<Utc>,
reason: Option<String>, // e.g., "peer connected", "backfill complete"
}
struct OperationMetrics {
// Broadcasts
broadcasts_sent: AtomicU64,
state_changes_broadcast: AtomicU64,
shared_changes_broadcast: AtomicU64,
broadcast_batches_sent: AtomicU64,
failed_broadcasts: AtomicU64,
// Receives
changes_received: AtomicU64,
changes_applied: AtomicU64,
changes_rejected: AtomicU64, // Failed to apply
buffer_queue_depth: AtomicU64,
// Backfill
active_backfill_sessions: AtomicU64,
backfill_sessions_completed: AtomicU64,
backfill_pagination_rounds: AtomicU64,
// Retries
retry_queue_depth: AtomicU64,
retry_attempts: AtomicU64,
retry_successes: AtomicU64,
}
struct DataVolumeMetrics {
// Per-model counters
entries_synced: HashMap<String, AtomicU64>, // model_type -> count
// Per-device counters
entries_by_device: HashMap<Uuid, DeviceMetrics>,
// Bytes transferred
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
// Last sync timestamps
last_sync_per_peer: HashMap<Uuid, DateTime<Utc>>,
last_sync_per_model: HashMap<String, DateTime<Utc>>,
}
struct DeviceMetrics {
device_id: Uuid,
device_name: String,
entries_received: AtomicU64,
last_seen: AtomicU64, // Unix timestamp
is_online: AtomicBool,
}
struct PerformanceMetrics {
// Latency tracking
broadcast_latency_ms: HistogramMetric,
apply_latency_ms: HistogramMetric,
backfill_request_latency_ms: HistogramMetric,
// Watermark tracking
state_watermark: AtomicU64, // Unix timestamp
shared_watermark: String, // HLC string (protected by RwLock)
watermark_lag_ms: HashMap<Uuid, AtomicU64>, // Per-peer lag
// HLC drift
hlc_physical_drift_ms: AtomicI64,
hlc_counter_max: AtomicU64,
// Database performance
db_query_duration_ms: HistogramMetric,
db_query_count: AtomicU64,
}
struct HistogramMetric {
count: AtomicU64,
sum: AtomicU64,
min: AtomicU64,
max: AtomicU64,
// Could add percentiles later
}
struct ErrorMetrics {
// Error counts by type
total_errors: AtomicU64,
network_errors: AtomicU64,
database_errors: AtomicU64,
apply_errors: AtomicU64,
validation_errors: AtomicU64,
// Recent errors (ring buffer)
recent_errors: Arc<RwLock<VecDeque<ErrorEvent>>>,
// Conflict resolution
conflicts_detected: AtomicU64,
conflicts_resolved_by_hlc: AtomicU64,
}
struct ErrorEvent {
timestamp: DateTime<Utc>,
error_type: String,
message: String,
model_type: Option<String>,
device_id: Option<Uuid>,
}
Files to create:
core/src/service/sync/metrics/mod.rs - Main modulecore/src/service/sync/metrics/collector.rs - Central collectorcore/src/service/sync/metrics/types.rs - Metric typescore/src/service/sync/metrics/snapshot.rs - Point-in-time snapshotscore/src/service/sync/metrics/history.rs - Time-series storageTasks:
SyncMetricsCollector with thread-safe accessFiles to modify:
core/src/service/sync/peer.rs - Add metrics recordingcore/src/service/sync/backfill.rs - Track backfill metricscore/src/service/sync/state.rs - Track state transitionscore/src/service/network/protocol/sync/handler.rs - Track message handlingTasks:
Files to create:
crates/cli/src/commands/sync/metrics.rs - CLI commandCommand structure:
# Get current metrics snapshot
sd sync metrics
# Get metrics for specific time range
sd sync metrics --since "1 hour ago"
sd sync metrics --since "2025-10-23 10:00:00"
# Watch metrics in real-time
sd sync metrics --watch
# Get metrics for specific peer
sd sync metrics --peer <device-id>
# Get metrics for specific model type
sd sync metrics --model entry
# Export metrics as JSON
sd sync metrics --json
# Show only specific categories
sd sync metrics --state # State transitions only
sd sync metrics --operations # Operation counters only
sd sync metrics --errors # Recent errors only
Output format:
Sync Metrics (Library: My Library)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
State
Current: Ready
Uptime: 2h 34m 12s
Last transition: 1h 23m ago (CatchingUp → Ready)
Time in state:
├─ Ready: 1h 23m (58%)
├─ CatchingUp: 47m (33%)
└─ Backfilling: 24m (9%)
Operations (Last hour)
Broadcasts:
├─ Total: 1,234 messages
├─ State changes: 547 (44%)
├─ Shared changes: 687 (56%)
└─ Batches: 23 (avg 53.7 items/batch)
Receives:
├─ Changes received: 2,567
├─ Changes applied: 2,563 (99.8%)
├─ Changes rejected: 4 (0.2%)
└─ Buffer queue: 0 items
Backfill:
├─ Sessions: 2 completed
├─ Pagination rounds: 12
└─ Avg session duration: 8.4s
Data Volume
Total synced: 105,432 records
By model:
├─ entries: 105,000 (99.6%)
├─ content_identities: 345 (0.3%)
├─ locations: 12 (0.01%)
└─ tags: 75 (0.07%)
By device:
├─ Device A (MacBook): 75,000 records (online)
├─ Device B (Desktop): 30,000 records (online)
└─ This device: 432 records
Bandwidth:
├─ Sent: 45.2 MB
├─ Received: 127.8 MB
└─ Total: 173.0 MB
Performance
Latency:
├─ Broadcast: avg 45ms, max 234ms
├─ Apply: avg 12ms, max 89ms
└─ Backfill request: avg 342ms, max 2.1s
Sync lag:
├─ Device A: 0ms (synced)
├─ Device B: 145ms (0.1s behind)
└─ Watermark age: 2.3s
HLC:
├─ Physical drift: +12ms
└─ Counter max: 3
Errors (Last hour)
Total: 3 errors
By type:
├─ Network: 2 (timeout, connection reset)
└─ Apply: 1 (foreign key constraint)
Retry queue: 0 pending
Recent errors:
[13:45:23] Network timeout: failed to send to device B
[13:42:10] Apply failed: missing parent entry (entry_id: 12345)
[13:38:55] Network error: connection reset by peer
Files to create:
core/src/ops/sync/get_metrics/mod.rs - Query for metricscore/src/ops/sync/get_metrics/action.rs - Action implementationQuery implementation:
// Define the query
pub struct GetSyncMetrics;
impl LibraryQuery for GetSyncMetrics {
type Input = GetSyncMetricsInput;
type Output = SyncMetricsSnapshot;
async fn execute(input: Self::Input, ctx: LibraryQueryContext) -> Result<Self::Output> {
let metrics = ctx.library().sync_service()?.metrics();
// Apply filters
let mut snapshot = metrics.snapshot();
if let Some(since) = input.since {
snapshot.filter_since(since);
}
if let Some(peer_id) = input.peer_id {
snapshot.filter_by_peer(peer_id);
}
if let Some(model_type) = input.model_type {
snapshot.filter_by_model(&model_type);
}
Ok(snapshot)
}
}
// Input/Output types
#[derive(Deserialize, Serialize, Type)]
pub struct GetSyncMetricsInput {
pub since: Option<DateTime<Utc>>,
pub peer_id: Option<Uuid>,
pub model_type: Option<String>,
}
// Usage via ApiDispatcher:
let metrics = dispatcher
.execute_library_query::<GetSyncMetrics>(input, session)
.await?;
Event emission: Emit events on metric updates for UI real-time display via the existing event bus:
event_bus.emit(Event::SyncMetricsUpdated {
library_id,
snapshot: metrics.snapshot(),
});
Store periodic snapshots in database for historical analysis:
CREATE TABLE sync_metrics_snapshots (
id INTEGER PRIMARY KEY,
library_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
snapshot_json TEXT NOT NULL -- JSON of full metrics
);
CREATE INDEX idx_metrics_library_time
ON sync_metrics_snapshots(library_id, timestamp);
SyncMetricsCollector integrated into sync componentssd sync metrics displays formatted metrics--since)--watch)#[tokio::test]
async fn test_sync_metrics_tracking() {
let (device_a, device_b) = setup_paired_devices().await;
// Enable metrics
let metrics = device_a.sync_metrics();
// Perform sync operations
create_entries(device_a, 100).await;
wait_for_sync().await;
// Verify metrics
let snapshot = metrics.snapshot();
assert_eq!(snapshot.operations.broadcasts_sent, 100);
assert_eq!(snapshot.data_volume.entries_synced.get("entry"), Some(&100));
assert!(snapshot.performance.broadcast_latency_ms.avg() < 100);
}
New files:
core/src/service/sync/metrics/mod.rscore/src/service/sync/metrics/collector.rscore/src/service/sync/metrics/types.rscore/src/service/sync/metrics/snapshot.rscore/src/service/sync/metrics/history.rscrates/cli/src/commands/sync/metrics.rsModified files:
core/src/service/sync/peer.rscore/src/service/sync/backfill.rscore/src/service/sync/state.rscore/src/service/network/protocol/sync/handler.rsdocs/core/library-sync.mdx