AirLibrary/Indexing/Background/
StartWatcher.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69 sync::{Mutex, RwLock, Semaphore},
70 task::JoinHandle,
71};
72
73use crate::{
74 AirError,
75 ApplicationState::ApplicationState,
76 Indexing::State::CreateState::FileIndex,
77 Result,
78};
79
80const MAX_WATCH_PROCESSORS:usize = 5;
82
83pub struct BackgroundIndexerContext {
85 pub app_state:Arc<ApplicationState>,
87 pub file_index:Arc<RwLock<FileIndex>>,
89 pub corruption_detected:Arc<Mutex<bool>>,
91 pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
93 pub indexing_semaphore:Arc<Semaphore>,
95 pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100 pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101 Self {
102 app_state,
103 file_index,
104 corruption_detected:Arc::new(Mutex::new(false)),
105 file_watcher:Arc::new(Mutex::new(None)),
106 indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
107 debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
108 }
109 }
110}
111
112pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
120 use notify::Watcher;
121
122 let index = context.file_index.clone();
123 let corruption_flag = context.corruption_detected.clone();
124 let config = context.app_state.Configuration.Indexing.clone();
125 let debounced_handler = context.debounced_handler.clone();
126
127 let mut watcher:notify::RecommendedWatcher = Watcher::new(
129 move |res:std::result::Result<notify::Event, notify::Error>| {
130 if let Ok(event) = res {
131 if *corruption_flag.blocking_lock() {
133 log::warn!("[StartWatcher] Skipping file event - index marked as corrupted");
134 return;
135 }
136
137 let index = index.clone();
138 let _index = index.clone();
140 let debounced_handler = debounced_handler.clone();
141 let _config_clone = config.clone();
142
143 tokio::spawn(async move {
144 if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
146 for path in &event.paths {
147 if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
148 path,
149 &crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
150 ) {
151 debounced_handler.AddChange(path.clone(), change_type).await;
152 }
153 }
154 }
155 });
156 }
157 },
158 notify::Config::default(),
159 )
160 .map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
161
162 for path in &paths {
164 if path.exists() {
165 match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
166 Ok(()) => {
167 watcher
168 .watch(path, notify::RecursiveMode::Recursive)
169 .map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
170 log::info!("[StartWatcher] Watching path: {}", path.display());
171 },
172 Err(e) => {
173 log::warn!("[StartWatcher] Skipping invalid watch path {}: {}", path.display(), e);
174 },
175 }
176 }
177 }
178
179 *context.file_watcher.lock().await = Some(watcher);
180
181 log::info!("[StartWatcher] File watcher started successfully for {} paths", paths.len());
182
183 Ok(())
184}
185
186pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
188 tokio::spawn(async move {
189 log::info!("[StartWatcher] Debounce processor started");
190
191 let interval = Duration::from_millis(100); let debounce_cutoff = Duration::from_millis(500);
194
195 loop {
196 tokio::time::sleep(interval).await;
197 {
198 if *context.corruption_detected.lock().await {
200 log::warn!("[StartWatcher] Index corrupted, pausing debounce processing");
201 tokio::time::sleep(Duration::from_secs(5)).await;
202 continue;
203 }
204
205 let config = context.app_state.Configuration.Indexing.clone();
207
208 match context
209 .debounced_handler
210 .ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
211 .await
212 {
213 Ok(changes) => {
214 if !changes.is_empty() {
215 log::debug!("[StartWatcher] Processed {} debounced changes", changes.len());
216 }
217 },
218 Err(e) => {
219 log::error!("[StartWatcher] Failed to process pending changes: {}", e);
220 },
221 }
222 }
223 }
224 })
225}
226
227pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
229 let config = &context.app_state.Configuration.Indexing;
230
231 if !config.Enabled {
232 log::info!("[StartWatcher] Background indexing disabled in configuration");
233 return Err(AirError::Configuration("Background indexing is disabled".to_string()));
234 }
235
236 let handle = tokio::spawn(BackgroundTask(context));
237
238 log::info!("[StartWatcher] Background tasks started");
239
240 Ok(handle)
241}
242
243pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
245 log::info!("[StartWatcher] Stopping background tasks");
246 }
248
249pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
251 if let Some(watcher) = context.file_watcher.lock().await.take() {
252 drop(watcher);
253 log::info!("[StartWatcher] File watcher stopped");
254 }
255}
256
257async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
259 let config = context.app_state.Configuration.Indexing.clone();
260
261 let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
262 let mut interval = tokio::time::interval(interval);
263
264 log::info!(
265 "[StartWatcher] Background indexing configured for {} minute intervals",
266 config.UpdateIntervalMinutes
267 );
268
269 loop {
270 interval.tick().await;
271 {
272 if *context.corruption_detected.lock().await {
274 log::warn!("[StartWatcher] Index corrupted, skipping background update");
275 continue;
276 }
277
278 log::info!("[StartWatcher] Running periodic background index...");
279
280 let directories = config.IndexDirectory.clone();
282 if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
283 {
284 log::error!("[StartWatcher] Background indexing failed: {}", e);
285 }
286 }
287 }
288}
289
290pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
292 let is_running = context.file_watcher.lock().await.is_some();
293 let pending_count = context.debounced_handler.PendingCount().await;
294 let is_corrupted = *context.corruption_detected.lock().await;
295
296 WatcherStatus { is_running, pending_count, is_corrupted }
297}
298
299#[derive(Debug, Clone)]
301pub struct WatcherStatus {
302 pub is_running:bool,
303 pub pending_count:usize,
304 pub is_corrupted:bool,
305}
306
307pub async fn StartAll(
309 context:Arc<BackgroundIndexerContext>,
310 watch_paths:Vec<PathBuf>,
311) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
312 let watcher_handle = if config_watch_enabled(&context) {
313 match StartFileWatcher(&context, watch_paths).await {
314 Ok(()) => {
315 Some(StartDebounceProcessor(context.clone()))
317 },
318 Err(e) => {
319 log::error!("[StartWatcher] Failed to start file watcher: {}", e);
320 None
321 },
322 }
323 } else {
324 None
325 };
326
327 let background_handle = match StartBackgroundTasks(context.clone()).await {
328 Ok(handle) => Some(handle),
329 Err(e) => {
330 log::warn!("[StartWatcher] Failed to start background tasks: {}", e);
331 None
332 },
333 };
334
335 Ok((watcher_handle, background_handle))
336}
337
338pub async fn StopAll(context:&BackgroundIndexerContext) {
340 StopBackgroundTasks(context).await;
341 StopFileWatcher(context).await;
342}
343
344fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }