.tasks/core/WATCH-001-platform-agnostic-event-system.md
Implement the platform-agnostic event system that normalizes filesystem events across macOS, Linux, and Windows. The system provides a clean API for watching paths and receiving events via broadcast channels, with reference counting for shared watches.
Main watcher interface with lifecycle management:
pub struct FsWatcher {
// Notify backend (platform-specific)
watcher: Arc<Mutex<RecommendedWatcher>>,
// Watched paths with reference counts
watches: Arc<RwLock<HashMap<PathBuf, WatchState>>>,
// Broadcast channel for events
event_tx: broadcast::Sender<FsEvent>,
// Metrics
events_received: AtomicU64,
events_emitted: AtomicU64,
}
impl FsWatcher {
pub fn new(config: WatcherConfig) -> Self;
pub async fn start(&self) -> Result<()>;
pub async fn stop(&self) -> Result<()>;
pub async fn watch(&self, path: impl AsRef<Path>, config: WatchConfig) -> Result<WatchHandle>;
pub fn subscribe(&self) -> broadcast::Receiver<FsEvent>;
pub fn events_received(&self) -> u64;
pub fn events_emitted(&self) -> u64;
}
Normalized event type emitted to consumers:
pub struct FsEvent {
pub path: PathBuf,
pub kind: FsEventKind,
pub timestamp: SystemTime,
pub is_directory: Option<bool>, // Avoids extra metadata calls
}
pub enum FsEventKind {
Create,
Modify,
Remove,
Rename { from: PathBuf, to: PathBuf },
}
impl FsEvent {
pub fn is_dir(&self) -> Option<bool>;
pub fn is_file(&self) -> Option<bool>;
}
Per-path watch configuration:
pub struct WatchConfig {
pub recursive: bool, // Recursive vs shallow
pub filters: EventFilters,
}
pub struct EventFilters {
pub skip_hidden: bool,
pub skip_system_files: bool,
pub skip_temp_files: bool,
pub skip_patterns: Vec<String>, // Custom patterns (e.g., "node_modules")
pub important_dotfiles: Vec<String>, // Preserve important dotfiles
}
impl WatchConfig {
pub fn recursive() -> Self; // Default recursive watch
pub fn shallow() -> Self; // Shallow watch (for ephemeral browsing)
pub fn with_filters(self, filters: EventFilters) -> Self;
}
Multiple watches on the same path share OS resources:
struct WatchState {
refcount: usize,
config: WatchConfig,
handle: WatchHandle,
}
// When watch() is called:
// 1. Check if path already watched
// 2. If yes, increment refcount
// 3. If no, register with OS watcher
// 4. Return handle that decrements on drop
Benefits:
Default filters skip noise:
fn should_emit_event(event: &FsEvent, filters: &EventFilters) -> bool {
let path = &event.path;
let name = path.file_name()?.to_str()?;
// Skip temp files
if filters.skip_temp_files {
if name.ends_with(".tmp") || name.ends_with(".temp")
|| name.starts_with("~") || name.ends_with(".swp") {
return false;
}
}
// Skip system files
if filters.skip_system_files {
if name == ".DS_Store" || name == "Thumbs.db" || name == "desktop.ini" {
return false;
}
}
// Skip hidden files (except important dotfiles)
if filters.skip_hidden && name.starts_with(".") {
if !filters.important_dotfiles.contains(&name.to_string()) {
return false;
}
}
// Skip custom patterns
for pattern in &filters.skip_patterns {
if name == pattern {
return false;
}
}
true
}
The watcher uses broadcast channels for multiple consumers:
// Watcher broadcasts events
let (event_tx, _) = broadcast::channel(10_000);
// Each consumer gets its own receiver
let rx1 = watcher.subscribe(); // PersistentIndexService
let rx2 = watcher.subscribe(); // EphemeralIndexService
Important: Consumers should NOT block in the receiver loop. Use internal batching queues:
// Good pattern for PersistentIndexService
let mut rx = watcher.subscribe();
let (batch_tx, batch_rx) = mpsc::channel(100_000);
// Fast, non-blocking receiver
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if is_in_my_scope(&event) {
let _ = batch_tx.send(event).await;
}
}
});
// Worker handles batching and DB writes
tokio::spawn(async move {
// Batch events, coalesce, write to DB...
});
crates/fs-watcher/src/lib.rs - Public API exportscrates/fs-watcher/src/watcher.rs - FsWatcher implementationcrates/fs-watcher/src/event.rs - FsEvent and FsEventKindcrates/fs-watcher/src/config.rs - WatchConfig and EventFilterscrates/fs-watcher/src/error.rs - WatcherError typesuse sd_fs_watcher::{FsWatcher, WatchConfig, WatcherConfig};
#[tokio::main]
async fn main() -> Result<()> {
// Create and start watcher
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await?;
// Subscribe to events
let mut rx = watcher.subscribe();
// Watch directory recursively
let _handle = watcher.watch("/path/to/watch", WatchConfig::recursive()).await?;
// Process events
while let Ok(event) = rx.recv().await {
match event.kind {
FsEventKind::Create => println!("Created: {:?}", event.path),
FsEventKind::Modify => println!("Modified: {:?}", event.path),
FsEventKind::Remove => println!("Removed: {:?}", event.path),
FsEventKind::Rename { from, to } => {
println!("Renamed: {:?} -> {:?}", from, to);
}
}
}
Ok(())
}
Located in crates/fs-watcher/src/:
test_reference_counting - Verify watch refcountstest_event_filtering - Verify filters worktest_recursive_vs_shallow - Verify watch modestest_broadcast_multiple_consumers - Verify multiple receivers workLocated in crates/fs-watcher/tests/:
test_create_event - Verify create events emittedtest_modify_event - Verify modify events emittedtest_remove_event - Verify remove events emittedtest_rename_event - Verify rename detection (platform-specific)