Skip to main content

Mountain/Environment/
FileWatcherProvider.rs

1//! # FileWatcherProvider (Environment)
2//!
3//! Backing implementation of
4//! [`FileWatcherProvider`](CommonLibrary::FileSystem::FileWatcherProvider)
5//! for [`MountainEnvironment`].
6//!
7//! Native filesystem notifications are delegated to the `notify` crate, which
8//! picks up inotify on Linux, FSEvents on macOS, and ReadDirectoryChangesW
9//! on Windows. Events from the watcher thread flow through an unbounded
10//! channel into a tokio task that forwards them back to Cocoon over the
11//! reverse-RPC channel as `$fileWatcher:event` notifications.
12//!
13//! # Concurrency notes
14//!
15//! - `notify::recommended_watcher` executes callbacks on its own native thread,
16//!   so we tunnel events through a bounded channel before touching async code.
17//!   The forwarder task is spawned once on first registration and lives for the
18//!   entire process lifetime.
19//! - macOS FSEvents may emit duplicate Create/Change events for the same path
20//!   in very short succession. We debounce by path within a 100 ms window
21//!   per-handle, keyed on `(handle, path, kind)`.
22//! - Linux inotify has a small per-user watcher cap
23//!   (`fs.inotify.max_user_watches`); hitting it surfaces as
24//!   `notify::Error::MaxFilesWatch`. We propagate that verbatim to the caller
25//!   so the UI can show a guidance message.
26
27use std::{
28	collections::HashMap,
29	path::PathBuf,
30	sync::{Arc, Mutex as StandardMutex},
31	time::{Duration, Instant},
32};
33
34use CommonLibrary::{
35	Environment::Requires::Requires,
36	Error::CommonError::CommonError,
37	FileSystem::FileWatcherProvider::{FileWatcherProvider, WatchEvent, WatchEventKind},
38	IPC::{IPCProvider::IPCProvider, SkyEvent::SkyEvent},
39};
40use async_trait::async_trait;
41use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
42use serde_json::json;
43use tokio::sync::mpsc as TokioMPSC;
44
45use super::MountainEnvironment::MountainEnvironment;
46use crate::dev_log;
47
48/// Interval below which a second (path, kind) event for the same handle is
49/// ignored. Tuned for FSEvents coalescing.
50const DebounceWindow:Duration = Duration::from_millis(100);
51
52/// Internal entry tracked per registered watcher. The `Watcher` handle must
53/// be kept alive for the lifetime of the registration; dropping it releases
54/// the OS resources.
55pub struct WatcherEntry {
56	#[allow(dead_code)]
57	Watcher:RecommendedWatcher,
58	LastSeen:HashMap<(PathBuf, &'static str), Instant>,
59}
60
61/// Composite key used to detect duplicate watcher registrations. Two
62/// extensions (or the same extension activated twice) frequently register
63/// the same `(root, recursive, pattern)` triple within milliseconds of
64/// each other - the typescript-language-features and git extensions are
65/// the worst offenders. Without dedup, each registration spawns its own
66/// notify::Watcher with its own kqueue/inotify subscription tree, doubling
67/// (or worse) FS-event traffic and burning kernel handles.
68type DedupKey = (PathBuf, bool, Option<String>);
69
70/// Lazily-initialised process-wide state for file watching. Instances of the
71/// event-forwarder task are singletons keyed on the MountainEnvironment
72/// handle. Access through `WatcherState::Get`.
73pub struct WatcherState {
74	pub Entries:Arc<StandardMutex<HashMap<String, WatcherEntry>>>,
75	pub EventSender:TokioMPSC::UnboundedSender<WatchEvent>,
76	/// Maps `(root, recursive, pattern)` to the primary handle that owns
77	/// the live OS watcher. Subsequent registrations matching the same
78	/// triple are aliased to the primary; only the primary creates a
79	/// notify::Watcher.
80	pub DedupIndex:Arc<StandardMutex<HashMap<DedupKey, String>>>,
81	/// Reverse index: primary handle → all aliased handles. When the
82	/// forwarder task gets an event for a primary, it fans the same
83	/// event out to every aliased handle so each extension's
84	/// `vscode.workspace.createFileSystemWatcher` callback fires once.
85	pub Aliases:Arc<StandardMutex<HashMap<String, Vec<String>>>>,
86	/// Reverse lookup for unregister: any handle (primary or alias) →
87	/// its primary. Lets `UnregisterWatcher` clean up alias entries
88	/// without scanning the entire `Aliases` map.
89	pub HandleToPrimary:Arc<StandardMutex<HashMap<String, String>>>,
90}
91
92impl WatcherState {
93	/// Obtain (or create) the global WatcherState. The forwarder task is
94	/// spawned on first access. Must be called from within a tokio runtime.
95	pub fn Get(env:&MountainEnvironment) -> Arc<WatcherState> {
96		use std::sync::OnceLock;
97
98		// One WatcherState per process - the backing notify watchers are
99		// cheap and multiplex fine, and we want a single forwarder task.
100		static GLOBAL:OnceLock<Arc<WatcherState>> = OnceLock::new();
101		GLOBAL
102			.get_or_init(|| {
103				let (tx, mut rx) = TokioMPSC::unbounded_channel::<WatchEvent>();
104				let state = Arc::new(WatcherState {
105					Entries:Arc::new(StandardMutex::new(HashMap::new())),
106					EventSender:tx,
107					DedupIndex:Arc::new(StandardMutex::new(HashMap::new())),
108					Aliases:Arc::new(StandardMutex::new(HashMap::new())),
109					HandleToPrimary:Arc::new(StandardMutex::new(HashMap::new())),
110				});
111
112				// The forwarder task holds a weak ref to the environment so
113				// it unwinds cleanly if the env is ever torn down. State is
114				// captured by Arc clone for the alias fan-out lookup.
115				let env_clone = env.clone();
116				let state_clone = state.clone();
117				tokio::spawn(async move {
118					use tauri::Emitter;
119					while let Some(WatchEvent { Handle, Kind, Path }) = rx.recv().await {
120						let ipc_provider:Arc<dyn IPCProvider> = env_clone.Require();
121						// Fan events to the primary handle plus every alias
122						// registered against it. Without this, the second
123						// extension to register a duplicate watcher would
124						// silently miss every event.
125						let mut Recipients:Vec<String> = vec![Handle.clone()];
126						if let Ok(AliasGuard) = state_clone.Aliases.lock() {
127							if let Some(AliasList) = AliasGuard.get(&Handle) {
128								Recipients.extend(AliasList.iter().cloned());
129							}
130						}
131						for RecipientHandle in Recipients {
132							let payload = json!({
133								"handle": RecipientHandle,
134								"kind": Kind.AsString(),
135								"path": Path.to_string_lossy().to_string(),
136							});
137							if let Err(error) = ipc_provider
138								.SendNotificationToSideCar(
139									"cocoon-main".to_string(),
140									"$fileWatcher:event".to_string(),
141									payload.clone(),
142								)
143								.await
144							{
145								dev_log!(
146									"filewatcher",
147									"warn: [FileWatcherProvider] Failed to forward event handle={} kind={} path={:?}: \
148									 {:?}",
149									RecipientHandle,
150									Kind.AsString(),
151									Path,
152									error
153								);
154							}
155							// Dual-emit to Wind/Sky so the Explorer tree,
156							// search index, and any other webview-side
157							// consumer can react to disk mutations without
158							// going through Cocoon. Wind's `TauriChannel`
159							// subscribes to `sky://vfs/fileChange` under
160							// the localFilesystem channel. Aliased handles
161							// each get their own emit so per-handle
162							// listeners on the Sky side fire correctly.
163							if let Err(Error) =
164								env_clone.ApplicationHandle.emit(SkyEvent::VFSFileChange.AsStr(), &payload)
165							{
166								dev_log!(
167									"filewatcher",
168									"warn: [FileWatcherProvider] sky://vfs/fileChange emit failed: {}",
169									Error
170								);
171							}
172						}
173					}
174				});
175
176				state
177			})
178			.clone()
179	}
180}
181
182fn MapEventKind(raw:&EventKind) -> Option<WatchEventKind> {
183	match raw {
184		EventKind::Create(_) => Some(WatchEventKind::Create),
185		EventKind::Modify(_) => Some(WatchEventKind::Change),
186		EventKind::Remove(_) => Some(WatchEventKind::Delete),
187		// Access / Any / Other events are not exposed to extensions.
188		_ => None,
189	}
190}
191
192/// Translate a VS Code glob pattern into a `regex::Regex` so the native
193/// watcher can apply the caller's filter before paying for an IPC hop. A
194/// small subset of the glob grammar is supported (`**`, `*`, `?`, `[…]`,
195/// `{…,…}` alternation) - exactly what TypeScript-language-features and
196/// the other ship-time extensions rely on.
197fn CompileGlobToRegex(Pattern:&str) -> Option<regex::Regex> {
198	let mut Regex = String::with_capacity(Pattern.len() * 2 + 4);
199	// Case-insensitive on macOS + Windows where the OS is typically
200	// case-insensitive; on case-sensitive Linux filesystems extensions commonly
201	// still use lowercase patterns, so the flag is safe across all three targets.
202	if cfg!(any(target_os = "macos", target_os = "windows")) {
203		Regex.push_str("(?i)");
204	}
205	Regex.push('^');
206	let mut Chars = Pattern.chars().peekable();
207	let mut InClass = false;
208	while let Some(C) = Chars.next() {
209		if InClass {
210			if C == ']' {
211				InClass = false;
212			}
213			Regex.push(C);
214			continue;
215		}
216		match C {
217			'*' => {
218				if Chars.peek() == Some(&'*') {
219					Chars.next();
220					if Chars.peek() == Some(&'/') {
221						Chars.next();
222						Regex.push_str("(?:.*/)?");
223					} else {
224						Regex.push_str(".*");
225					}
226				} else {
227					Regex.push_str("[^/]*");
228				}
229			},
230			'?' => Regex.push_str("[^/]"),
231			'[' => {
232				Regex.push('[');
233				InClass = true;
234			},
235			'{' => Regex.push_str("(?:"),
236			'}' => Regex.push(')'),
237			',' => Regex.push('|'),
238			'.' | '+' | '(' | ')' | '^' | '$' | '|' | '\\' => {
239				Regex.push('\\');
240				Regex.push(C);
241			},
242			_ => Regex.push(C),
243		}
244	}
245	Regex.push('$');
246	regex::Regex::new(&Regex).ok()
247}
248
249#[async_trait]
250impl FileWatcherProvider for MountainEnvironment {
251	async fn RegisterWatcher(
252		&self,
253		Handle:String,
254		Root:PathBuf,
255		IsRecursive:bool,
256		Pattern:Option<String>,
257	) -> Result<(), CommonError> {
258		let state = WatcherState::Get(self);
259
260		// De-dup pass 1: same handle re-registered (cheap idempotency).
261		{
262			let guard = state
263				.Entries
264				.lock()
265				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
266			if guard.contains_key(&Handle) {
267				dev_log!(
268					"filewatcher",
269					"[FileWatcherProvider] handle={} already registered; skipping duplicate",
270					Handle
271				);
272				return Ok(());
273			}
274		}
275
276		// De-dup pass 2: same (root, recursive, pattern) triple already has
277		// a primary watcher. The git extension, typescript-language-features,
278		// and several `composer.*` extensions all hit this path during boot
279		// (observed: `**/composer.json`, `**/composer.lock`, `**/*.md`,
280		// `**/package.json` registered twice each within ~50ms). Aliasing
281		// avoids the duplicate notify::Watcher / kqueue subscription tree
282		// while still fanning events to every aliased handle.
283		let DedupKeyValue:DedupKey = (Root.clone(), IsRecursive, Pattern.clone());
284		{
285			let DedupGuard = state
286				.DedupIndex
287				.lock()
288				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
289			if let Some(PrimaryHandle) = DedupGuard.get(&DedupKeyValue).cloned() {
290				drop(DedupGuard);
291				let mut AliasGuard = state
292					.Aliases
293					.lock()
294					.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
295				AliasGuard
296					.entry(PrimaryHandle.clone())
297					.or_insert_with(Vec::new)
298					.push(Handle.clone());
299				let mut H2PGuard = state
300					.HandleToPrimary
301					.lock()
302					.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
303				H2PGuard.insert(Handle.clone(), PrimaryHandle.clone());
304				dev_log!(
305					"filewatcher",
306					"[FileWatcherProvider] dedup hit; handle={} aliased to primary={} root={} pattern={:?}",
307					Handle,
308					PrimaryHandle,
309					Root.display(),
310					Pattern
311				);
312				return Ok(());
313			}
314		}
315		// First registration for this triple. The DedupIndex insert
316		// happens AFTER successful OS-watcher creation below so an
317		// errored or benign-absent registration doesn't leave a stale
318		// dedup entry pointing at a non-existent primary.
319
320		let CompiledPattern = Pattern.as_deref().and_then(CompileGlobToRegex);
321		let pattern_for_callback = CompiledPattern.clone();
322
323		// Prepare the per-event callback. It owns clones of the handle and
324		// the forwarder channel; debouncing state lives in the entry under
325		// the global mutex (fine - the callback is not hot).
326		let handle_for_callback = Handle.clone();
327		let sender = state.EventSender.clone();
328		let entries = state.Entries.clone();
329		let mut watcher = notify::recommended_watcher(move |event_result:notify::Result<notify::Event>| {
330			let Ok(event) = event_result else { return };
331			let Some(kind) = MapEventKind(&event.kind) else { return };
332			let kind_tag = kind.AsString();
333
334			// Pattern filter - reject early so the event never crosses IPC.
335			let matched_paths:Vec<PathBuf> = event
336				.paths
337				.into_iter()
338				.filter(|path| {
339					match &pattern_for_callback {
340						Some(re) => re.is_match(&path.to_string_lossy()),
341						None => true,
342					}
343				})
344				.collect();
345			if matched_paths.is_empty() {
346				return;
347			}
348
349			// Debounce per (handle, path, kind). Lock is uncontested for
350			// single-path events; bursts from FSEvents coalesce cleanly.
351			let mut final_paths:Vec<PathBuf> = Vec::with_capacity(matched_paths.len());
352			if let Ok(mut guard) = entries.lock() {
353				if let Some(entry) = guard.get_mut(&handle_for_callback) {
354					let now = Instant::now();
355					entry
356						.LastSeen
357						.retain(|_, instant| now.duration_since(*instant) < Duration::from_secs(10));
358					for path in matched_paths {
359						let key = (path.clone(), kind_tag);
360						let keep = match entry.LastSeen.get(&key) {
361							Some(previous) if now.duration_since(*previous) < DebounceWindow => false,
362							_ => {
363								entry.LastSeen.insert(key, now);
364								true
365							},
366						};
367						if keep {
368							final_paths.push(path);
369						}
370					}
371				} else {
372					return;
373				}
374			} else {
375				return;
376			}
377
378			for path in final_paths {
379				let _ = sender.send(WatchEvent { Handle:handle_for_callback.clone(), Kind:kind, Path:path });
380			}
381		})
382		.map_err(|error| CommonError::Unknown { Description:format!("FileWatcher create failed: {}", error) })?;
383
384		let mode = if IsRecursive { RecursiveMode::Recursive } else { RecursiveMode::NonRecursive };
385		// Watching a non-existent path is a common pattern: extensions
386		// register watchers on optional config dirs (`~/.roo/skills-*`,
387		// `.vscode/settings.json` in fresh workspaces, …) that may appear
388		// later. `notify` returns `Error::PathNotFound` / "No path was
389		// found"; failing the gRPC call counts against Cocoon's circuit
390		// breaker - 5 such probes at boot trip the breaker open and
391		// cascade into 60s of rejected reads. Record a "deferred" entry
392		// without a live OS watcher so Unregister still works; future
393		// events for that path won't fire, but the extension can re-
394		// register once the directory appears, just like in stock VS Code.
395		let WatchResult = watcher.watch(&Root, mode);
396		let mut guard = state
397			.Entries
398			.lock()
399			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
400		let _ = CompiledPattern;
401		match WatchResult {
402			Ok(()) => {
403				guard.insert(Handle.clone(), WatcherEntry { Watcher:watcher, LastSeen:HashMap::new() });
404				// Drop the Entries lock before grabbing DedupIndex to
405				// avoid lock-order divergence vs the alias path (which
406				// takes DedupIndex first). Re-acquire is cheap.
407				drop(guard);
408				if let Ok(mut DedupGuard) = state.DedupIndex.lock() {
409					DedupGuard.entry(DedupKeyValue.clone()).or_insert_with(|| Handle.clone());
410				}
411				dev_log!(
412					"filewatcher",
413					"[FileWatcherProvider] Registered watcher handle={} root={} recursive={} pattern={:?}",
414					Handle,
415					Root.display(),
416					IsRecursive,
417					Pattern
418				);
419				return Ok(());
420			},
421			Err(error) => {
422				let ErrorString = error.to_string().to_lowercase();
423				let IsBenignAbsent = ErrorString.contains("no path was found")
424					|| ErrorString.contains("no such file or directory")
425					|| ErrorString.contains("entity not found")
426					|| ErrorString.contains("path not found")
427					|| ErrorString.contains("os error 2")
428					|| !Root.exists();
429				if IsBenignAbsent {
430					dev_log!(
431						"filewatcher",
432						"[FileWatcherProvider] watch path absent (deferred) handle={} root={} err={}",
433						Handle,
434						Root.display(),
435						error
436					);
437					// Drop watcher (no live subscription); record handle so
438					// Unregister still finds something to remove. We do NOT
439					// reuse the closure's notify::Watcher here.
440					drop(watcher);
441				} else {
442					return Err(CommonError::Unknown {
443						Description:format!("FileWatcher watch failed for {}: {}", Root.display(), error),
444					});
445				}
446			},
447		}
448
449		dev_log!(
450			"filewatcher",
451			"[FileWatcherProvider] Registered watcher handle={} root={} recursive={} pattern={:?}",
452			Handle,
453			Root.display(),
454			IsRecursive,
455			Pattern
456		);
457
458		Ok(())
459	}
460
461	async fn UnregisterWatcher(&self, Handle:String) -> Result<(), CommonError> {
462		let state = WatcherState::Get(self);
463
464		// Step 1: alias removal. If the handle was aliased to a primary,
465		// just remove it from the alias list and the lookup map. The OS
466		// watcher stays alive because the primary still owns it.
467		let MaybePrimary = {
468			let mut H2PGuard = state
469				.HandleToPrimary
470				.lock()
471				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
472			H2PGuard.remove(&Handle)
473		};
474		if let Some(PrimaryHandle) = MaybePrimary {
475			let mut AliasGuard = state
476				.Aliases
477				.lock()
478				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
479			if let Some(AliasList) = AliasGuard.get_mut(&PrimaryHandle) {
480				AliasList.retain(|EntryHandle| EntryHandle != &Handle);
481				if AliasList.is_empty() {
482					AliasGuard.remove(&PrimaryHandle);
483				}
484			}
485			dev_log!(
486				"filewatcher",
487				"[FileWatcherProvider] Unregistered alias handle={} primary={}",
488				Handle,
489				PrimaryHandle
490			);
491			return Ok(());
492		}
493
494		// Step 2: primary removal. Drop the OS watcher and clear the
495		// dedup index entry. Any still-aliased handles are left dangling -
496		// callers requesting a primary unregister while aliases still
497		// exist is unusual but not fatal; the alias entries simply
498		// stop receiving events.
499		let mut Guard = state
500			.Entries
501			.lock()
502			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
503		if Guard.remove(&Handle).is_some() {
504			dev_log!("filewatcher", "[FileWatcherProvider] Unregistered watcher handle={}", Handle);
505		}
506		drop(Guard);
507
508		// Clear the dedup-index entry pointing at this primary so a
509		// future registration for the same triple opens a fresh OS
510		// watcher rather than aliasing to a removed handle.
511		let mut DedupGuard = state
512			.DedupIndex
513			.lock()
514			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
515		DedupGuard.retain(|_, PrimaryHandle| PrimaryHandle != &Handle);
516		Ok(())
517	}
518}