AirLibrary/Indexing/Background/
StartWatcher.rs

1//! # StartWatcher
2//!
3//! ## File: Indexing/Background/StartWatcher.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides background task management for the File Indexer service,
8//! handling file watching startup and periodic indexing tasks.
9//!
10//! ## Primary Responsibility
11//!
12//! Start and manage background file watcher and periodic indexing tasks
13//! for the indexing service.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File watcher initialization and lifecycle management
18//! - Periodic background re-indexing
19//! - Watcher event debouncing
20//! - Background task cleanup
21//!
22//! ## Dependencies
23//!
24//! **External Crates:**
25//! - `notify` - File system watching
26//! - `tokio` - Async runtime for background tasks
27//!
28//! **Internal Modules:**
29//! - `crate::Result` - Error handling type
30//! - `crate::AirError` - Error types
31//! - `crate::ApplicationState::ApplicationState` - Application state
32//! - `super::super::FileIndexer` - Main file indexer
33//! - `super::WatchFile` - File watching operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
38//!
39//! ## VSCode Pattern Reference
40//!
41//! Inspired by VSCode's background services in
42//! `src/vs/workbench/services/search/common/`
43//!
44//! ## Security Considerations
45//!
46//! - Path validation before watching
47//! - Watch path limits enforcement
48//! - Permission checking on watch paths
49//!
50//! ## Performance Considerations
51//!
52//! - Event debouncing prevents excessive re-indexing
53//! - Parallel processing of file changes
54//! - Efficient background task scheduling
55//!
56//! ## Error Handling Strategy
57//!
58//! Background tasks log errors and continue running, ensuring
59//! temporary failures don't stop the indexing service.
60//!
61//! ## Thread Safety
62//!
63//! Background tasks use Arc for shared state and async/await
64//! for safe concurrent operations.
65
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69	sync::{Mutex, RwLock, Semaphore},
70	task::JoinHandle,
71};
72
73use crate::{
74	AirError,
75	ApplicationState::ApplicationState,
76	Indexing::State::CreateState::FileIndex,
77	Result,
78};
79
80/// Maximum number of parallel watch event processors
81const MAX_WATCH_PROCESSORS:usize = 5;
82
83/// Background indexer context containing shared state
84pub struct BackgroundIndexerContext {
85	/// Application state reference
86	pub app_state:Arc<ApplicationState>,
87	/// File index
88	pub file_index:Arc<RwLock<FileIndex>>,
89	/// Corruption detected flag
90	pub corruption_detected:Arc<Mutex<bool>>,
91	/// File watcher (optional)
92	pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
93	/// Semaphore for limiting parallel operations
94	pub indexing_semaphore:Arc<Semaphore>,
95	/// Debounced event handler
96	pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100	pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101		Self {
102			app_state,
103			file_index,
104			corruption_detected:Arc::new(Mutex::new(false)),
105			file_watcher:Arc::new(Mutex::new(None)),
106			indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
107			debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
108		}
109	}
110}
111
112/// Start file watcher for incremental indexing
113///
114/// Monitors file system changes and updates index in real-time.
115/// This enables:
116/// - Real-time search updates
117/// - Automatic reindexing of changed files
118/// - Removal of deleted files from index
119pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
120	use notify::Watcher;
121
122	let index = context.file_index.clone();
123	let corruption_flag = context.corruption_detected.clone();
124	let config = context.app_state.Configuration.Indexing.clone();
125	let debounced_handler = context.debounced_handler.clone();
126
127	// Create and start the watcher
128	let mut watcher:notify::RecommendedWatcher = Watcher::new(
129		move |res:std::result::Result<notify::Event, notify::Error>| {
130			if let Ok(event) = res {
131				// Check corruption flag before processing events
132				if *corruption_flag.blocking_lock() {
133					log::warn!("[StartWatcher] Skipping file event - index marked as corrupted");
134					return;
135				}
136
137				let index = index.clone();
138				// Variables cloned for use in async task
139				let _index = index.clone();
140				let debounced_handler = debounced_handler.clone();
141				let _config_clone = config.clone();
142
143				tokio::spawn(async move {
144					// Convert event to change type and add to debounced handler
145					if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
146						for path in &event.paths {
147							if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
148								path,
149								&crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
150							) {
151								debounced_handler.AddChange(path.clone(), change_type).await;
152							}
153						}
154					}
155				});
156			}
157		},
158		notify::Config::default(),
159	)
160	.map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
161
162	// Watch all specified paths
163	for path in &paths {
164		if path.exists() {
165			match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
166				Ok(()) => {
167					watcher
168						.watch(path, notify::RecursiveMode::Recursive)
169						.map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
170					log::info!("[StartWatcher] Watching path: {}", path.display());
171				},
172				Err(e) => {
173					log::warn!("[StartWatcher] Skipping invalid watch path {}: {}", path.display(), e);
174				},
175			}
176		}
177	}
178
179	*context.file_watcher.lock().await = Some(watcher);
180
181	log::info!("[StartWatcher] File watcher started successfully for {} paths", paths.len());
182
183	Ok(())
184}
185
186/// Start the debounce processor task
187pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
188	tokio::spawn(async move {
189		log::info!("[StartWatcher] Debounce processor started");
190
191		let interval = Duration::from_millis(100); // Process every 100ms
192		// Debounce age cutoff
193		let debounce_cutoff = Duration::from_millis(500);
194
195		loop {
196			tokio::time::sleep(interval).await;
197			{
198				// Check corruption flag
199				if *context.corruption_detected.lock().await {
200					log::warn!("[StartWatcher] Index corrupted, pausing debounce processing");
201					tokio::time::sleep(Duration::from_secs(5)).await;
202					continue;
203				}
204
205				// Process pending changes
206				let config = context.app_state.Configuration.Indexing.clone();
207
208				match context
209					.debounced_handler
210					.ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
211					.await
212				{
213					Ok(changes) => {
214						if !changes.is_empty() {
215							log::debug!("[StartWatcher] Processed {} debounced changes", changes.len());
216						}
217					},
218					Err(e) => {
219						log::error!("[StartWatcher] Failed to process pending changes: {}", e);
220					},
221				}
222			}
223		}
224	})
225}
226
227/// Start background tasks for periodic indexing
228pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
229	let config = &context.app_state.Configuration.Indexing;
230
231	if !config.Enabled {
232		log::info!("[StartWatcher] Background indexing disabled in configuration");
233		return Err(AirError::Configuration("Background indexing is disabled".to_string()));
234	}
235
236	let handle = tokio::spawn(BackgroundTask(context));
237
238	log::info!("[StartWatcher] Background tasks started");
239
240	Ok(handle)
241}
242
243/// Stop background tasks
244pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
245	log::info!("[StartWatcher] Stopping background tasks");
246	// Tasks are cancelled when the task handle is dropped
247}
248
249/// Stop file watcher
250pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
251	if let Some(watcher) = context.file_watcher.lock().await.take() {
252		drop(watcher);
253		log::info!("[StartWatcher] File watcher stopped");
254	}
255}
256
257/// Background task for periodic indexing
258async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
259	let config = context.app_state.Configuration.Indexing.clone();
260
261	let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
262	let mut interval = tokio::time::interval(interval);
263
264	log::info!(
265		"[StartWatcher] Background indexing configured for {} minute intervals",
266		config.UpdateIntervalMinutes
267	);
268
269	loop {
270		interval.tick().await;
271		{
272			// Check corruption flag
273			if *context.corruption_detected.lock().await {
274				log::warn!("[StartWatcher] Index corrupted, skipping background update");
275				continue;
276			}
277
278			log::info!("[StartWatcher] Running periodic background index...");
279
280			// Re-index configured directories
281			let directories = config.IndexDirectory.clone();
282			if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
283			{
284				log::error!("[StartWatcher] Background indexing failed: {}", e);
285			}
286		}
287	}
288}
289
290/// Get watcher status
291pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
292	let is_running = context.file_watcher.lock().await.is_some();
293	let pending_count = context.debounced_handler.PendingCount().await;
294	let is_corrupted = *context.corruption_detected.lock().await;
295
296	WatcherStatus { is_running, pending_count, is_corrupted }
297}
298
299/// Watcher status information
300#[derive(Debug, Clone)]
301pub struct WatcherStatus {
302	pub is_running:bool,
303	pub pending_count:usize,
304	pub is_corrupted:bool,
305}
306
307/// Start all background components (watcher and tasks)
308pub async fn StartAll(
309	context:Arc<BackgroundIndexerContext>,
310	watch_paths:Vec<PathBuf>,
311) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
312	let watcher_handle = if config_watch_enabled(&context) {
313		match StartFileWatcher(&context, watch_paths).await {
314			Ok(()) => {
315				// Start debounce processor
316				Some(StartDebounceProcessor(context.clone()))
317			},
318			Err(e) => {
319				log::error!("[StartWatcher] Failed to start file watcher: {}", e);
320				None
321			},
322		}
323	} else {
324		None
325	};
326
327	let background_handle = match StartBackgroundTasks(context.clone()).await {
328		Ok(handle) => Some(handle),
329		Err(e) => {
330			log::warn!("[StartWatcher] Failed to start background tasks: {}", e);
331			None
332		},
333	};
334
335	Ok((watcher_handle, background_handle))
336}
337
338/// Stop all background components
339pub async fn StopAll(context:&BackgroundIndexerContext) {
340	StopBackgroundTasks(context).await;
341	StopFileWatcher(context).await;
342}
343
344/// Check if watching is enabled in configuration
345fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }