.tasks/core/LSYNC-012-entry-sync-bulk-optimization.md
Optimize entry (file/folder) synchronization for bulk indexing operations using efficient state broadcasts. When a device indexes 1M files, avoid sending 1M individual StateChange messages by using batched transfers or on-demand loading.
Device A indexes 1M files:
Naive approach: Send 1M individual StateChange messages
This doesn't scale.
// Device A finishes indexing location
let entries = query_all_entries_for_location(location_id).await?;
// Send in efficient batches
for chunk in entries.chunks(1000) {
broadcast_to_peers(StateBatch {
model_type: "entry",
device_id: MY_DEVICE_ID,
records: chunk.iter().map(|e| StateRecord {
uuid: e.uuid,
data: serde_json::to_value(e)?,
timestamp: e.updated_at,
}).collect(),
}).await?;
}
Benefits:
// Device A finishes indexing
broadcast_to_peers(BulkIndexComplete {
device_id: MY_DEVICE_ID,
location_id: location.uuid,
entry_count: 1_000_000,
indexed_at: Utc::now(),
}).await?;
// Peers decide what to do:
// Option A: Request entries on-demand (lazy loading)
// Option B: If same location exists, trigger own indexing
// Option C: Request full dump for initial sync
Benefits:
// New device joins with 0 entries
// Instead of: Request 1M entries via messages
// Do: Request database snapshot
let snapshot = peer.export_device_state(device_id).await?;
// Returns: SQLite database dump of just Device A's data
import_database_snapshot(snapshot).await?;
// Fast: Direct database import
Benefits:
File: core/src/service/sync/state.rs
/// Broadcast large state changes efficiently
pub async fn broadcast_bulk_state(
&self,
model_type: &str,
records: Vec<StateRecord>,
) -> Result<()> {
const BATCH_SIZE: usize = 1000;
for (i, chunk) in records.chunks(BATCH_SIZE).enumerate() {
let batch = SyncMessage::StateBatch {
model_type: model_type.to_string(),
device_id: self.device_id,
batch_index: i,
total_batches: (records.len() + BATCH_SIZE - 1) / BATCH_SIZE,
records: chunk.to_vec(),
};
// Compress before sending
let compressed = compress_batch(&batch)?;
// Broadcast to all peers
for peer in self.get_connected_sync_partners().await? {
self.send_to_peer(peer, compressed.clone()).await?;
}
// Rate limit to avoid overwhelming receivers
tokio::time::sleep(Duration::from_millis(10)).await;
}
Ok(())
}
File: core/src/service/sync/state.rs
/// Handle bulk notification from peer
async fn on_bulk_index_complete(
&self,
notification: BulkIndexComplete,
) -> Result<()> {
info!(
device = %notification.device_id,
location = %notification.location_id,
count = notification.entry_count,
"Peer completed bulk indexing"
);
// Check if we need this data
let should_sync = self.should_sync_location(
notification.device_id,
notification.location_id,
).await?;
if should_sync {
// Queue background job to fetch entries
self.queue_bulk_fetch_job(
notification.device_id,
notification.location_id,
).await?;
} else {
// Just record that peer has this data (for future queries)
self.update_peer_state(notification).await?;
}
Ok(())
}
File: core/src/service/sync/snapshot.rs
/// Export device-owned state as database snapshot
pub async fn export_device_snapshot(
&self,
device_id: Uuid,
) -> Result<Vec<u8>> {
// Create temporary database
let temp_db = Database::memory().await?;
// Copy only this device's data
let locations = location::Entity::find()
.filter(location::Column::DeviceId.eq(device_id))
.all(self.db.conn())
.await?;
let entries = entry::Entity::find()
.filter(entry::Column::DeviceId.eq(device_id)) // Via location
.all(self.db.conn())
.await?;
// Insert into temp database
for loc in locations {
loc.insert(temp_db.conn()).await?;
}
for entry in entries {
entry.insert(temp_db.conn()).await?;
}
// Export as binary blob
let snapshot = temp_db.export_to_bytes().await?;
Ok(snapshot)
}
| Scenario | Strategy | Reason |
|---|---|---|
| New device joins | Database snapshot | Fast initial sync |
| Incremental sync (few changes) | Individual StateChange | Simple, immediate |
| Large batch (100-10K entries) | Batched StateBatch | Efficient, streaming |
| Massive index (100K+ entries) | Bulk notification + on-demand | Bandwidth-aware |
| Method | 1M Entries | Network | Time | Memory |
|---|---|---|---|---|
| Individual messages | 500MB | High | 10 min | Low |
| Batched (1K chunks) | 50MB (compressed) | Medium | 2 min | Medium |
| Bulk notification + lazy | 1KB notification | Minimal | Async | Low |
| Database snapshot | 150MB | One-time | 30 sec | High |
impl TransactionManager {
/// Commit bulk entries (indexer use case)
pub async fn commit_bulk_entries(
&self,
library: Arc<Library>,
entries: Vec<entry::ActiveModel>,
) -> Result<()> {
// Write to database
bulk_insert(entries).await?;
// Don't create 1M sync messages!
// Instead: Emit bulk completion event
event_bus.emit(Event::BulkIndexComplete {
device_id: MY_DEVICE_ID,
location_id,
entry_count: entries.len(),
});
// SyncService handles efficient broadcast
}
}
impl SyncService {
async fn on_bulk_index_complete(&self, event: BulkIndexComplete) {
// Decide strategy based on peer state
if self.is_initial_sync() {
// Offer database snapshot
self.broadcast_snapshot_available().await?;
} else {
// Send batched state
self.broadcast_bulk_state(entries).await?;
}
}
}
Old approach: Bulk operations in sync log with sequence numbers New approach: Efficient state batching, no central log
Changes needed:
core/src/infra/sync/NEW_SYNC.md - Leaderless architecture