AirLibrary/Indexing/Store/
UpdateIndex.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
71
72use tokio::{
73 sync::{RwLock, Semaphore},
74 time::Instant,
75};
76
77use crate::{
78 AirError,
79 Configuration::IndexingConfig,
80 Indexing::State::CreateState::{FileIndex, FileMetadata},
81 Result,
82};
83
84pub async fn UpdateSingleFile(
86 index:&mut FileIndex,
87 file_path:&PathBuf,
88 config:&IndexingConfig,
89) -> Result<Option<FileMetadata>> {
90 let start_time = Instant::now();
91
92 if !file_path.exists() {
94 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
96 log::debug!("[UpdateIndex] Removed deleted file: {}", file_path.display());
97 return Ok(None);
98 }
99
100 let current_metadata = std::fs::metadata(file_path)
102 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
103
104 let current_modified = current_metadata
105 .modified()
106 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
107
108 let _current_modified_time = chrono::DateTime::<chrono::Utc>::from(current_modified);
109
110 let needs_update = match index.files.get(file_path) {
112 Some(old_metadata) => {
113 let checksum = crate::Indexing::Scan::ScanFile::CalculateChecksum(
115 &tokio::fs::read(file_path).await.unwrap_or_default(),
116 );
117 old_metadata.checksum != checksum
118 },
119 None => {
120 true
122 },
123 };
124
125 if !needs_update {
126 log::trace!("[UpdateIndex] File unchanged: {}", file_path.display());
127 return Ok(index.files.get(file_path).cloned());
128 }
129
130 use crate::Indexing::{
132 Scan::ScanFile::IndexFileInternal,
133 State::UpdateState::UpdateIndexMetadata,
134 };
135
136 let (metadata, symbols) = IndexFileInternal(file_path, config, &[]).await?;
137
138 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
140 crate::Indexing::State::UpdateState::AddFileToIndex(index, file_path.clone(), metadata.clone(), symbols)?;
141
142 UpdateIndexMetadata(index)?;
144
145 let elapsed = start_time.elapsed();
146 log::trace!(
147 "[UpdateIndex] Updated {} in {}ms ({} symbols)",
148 file_path.display(),
149 elapsed.as_millis(),
150 metadata.symbol_count
151 );
152
153 Ok(Some(metadata))
154}
155
156pub async fn UpdateFileContent(index:&mut FileIndex, file_path:&PathBuf, metadata:&FileMetadata) -> Result<()> {
158 if !metadata.mime_type.starts_with("text/") && !metadata.mime_type.contains("json") {
160 return Ok(());
161 }
162
163 let content = tokio::fs::read_to_string(file_path)
164 .await
165 .map_err(|e| AirError::FileSystem(format!("Failed to read file content: {}", e)))?;
166
167 for (_, files) in index.content_index.iter_mut() {
169 files.retain(|p| p != file_path);
170 }
171
172 let tokens = crate::Indexing::Process::ProcessContent::TokenizeContent(&content);
174
175 for token in tokens {
176 if token.len() > 2 {
177 index
179 .content_index
180 .entry(token)
181 .or_insert_with(Vec::new)
182 .push(file_path.clone());
183 }
184 }
185
186 Ok(())
187}
188
189pub async fn UpdateFilesBatch(
191 index:&mut FileIndex,
192 file_paths:Vec<PathBuf>,
193 config:&IndexingConfig,
194) -> Result<UpdateBatchResult> {
195 let start_time = Instant::now();
196 let mut updated_count = 0u32;
197 let mut removed_count = 0u32;
198 let mut error_count = 0u32;
199 let mut total_size = 0u64;
200
201 for file_path in file_paths {
202 match UpdateSingleFile(index, &file_path, config).await {
203 Ok(Some(metadata)) => {
204 updated_count += 1;
205 total_size += metadata.size;
206 },
207 Ok(None) => {
208 removed_count += 1;
209 },
210 Err(e) => {
211 log::warn!("[UpdateIndex] Failed to update file {}: {}", file_path.display(), e);
212 error_count += 1;
213 },
214 }
215 }
216
217 crate::Indexing::State::UpdateState::UpdateIndexMetadata(index)?;
219
220 Ok(UpdateBatchResult {
221 updated_count,
222 removed_count,
223 error_count,
224 total_size,
225 duration_seconds:start_time.elapsed().as_secs_f64(),
226 })
227}
228
229#[derive(Debug, Clone)]
231pub struct UpdateBatchResult {
232 pub updated_count:u32,
233 pub removed_count:u32,
234 pub error_count:u32,
235 pub total_size:u64,
236 pub duration_seconds:f64,
237}
238
239pub struct DebouncedUpdate {
241 file_path:PathBuf,
242 last_seen:Instant,
243 index:*const RwLock<FileIndex>,
244 config:IndexingConfig,
245 duration:Duration,
246 pending:bool,
247}
248
249unsafe impl Send for DebouncedUpdate {}
250
251impl DebouncedUpdate {
252 pub fn new(file_path:PathBuf, index:&RwLock<FileIndex>, config:&IndexingConfig, duration:Duration) -> Self {
253 Self {
254 file_path,
255 last_seen:Instant::now(),
256 index:index as *const RwLock<FileIndex>,
257 config:config.clone(),
258 duration,
259 pending:false,
260 }
261 }
262
263 pub async fn trigger(&mut self) {
264 self.last_seen = Instant::now();
265 self.pending = true;
266 }
267
268 pub async fn process_if_ready(&mut self) -> Result<bool> {
269 if !self.pending {
270 return Ok(false);
271 }
272
273 if self.last_seen.elapsed() >= self.duration {
274 self.pending = false;
275
276 let index_ref = unsafe { &*self.index };
278 let mut index = index_ref.write().await;
279
280 match UpdateSingleFile(&mut index, &self.file_path, &self.config).await {
281 Ok(_) => {
282 log::debug!("[UpdateIndex] Debounced update completed: {}", self.file_path.display());
283 return Ok(true);
284 },
285 Err(e) => {
286 log::warn!("[UpdateIndex] Debounced update failed: {}", e);
287 return Err(e);
288 },
289 }
290 }
291
292 Ok(false)
293 }
294
295 pub fn clear_pending(&mut self) { self.pending = false; }
296}
297
298pub async fn ProcessWatcherEvent(
300 index:&mut FileIndex,
301 event:notify::Event,
302 config:&IndexingConfig,
303) -> Result<WatcherEventResult> {
304 let mut updated = 0u32;
305 let mut removed = 0u32;
306
307 for file_path in event.paths {
308 match event.kind {
309 notify::EventKind::Create(notify::event::CreateKind::File) => {
310 log::debug!("[UpdateIndex] File created: {}", file_path.display());
311 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
312 updated += 1;
313 }
314 },
315 notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
316 | notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
317 log::debug!("[UpdateIndex] File modified: {}", file_path.display());
318 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
319 updated += 1;
320 }
321 },
322 notify::EventKind::Remove(notify::event::RemoveKind::File) => {
323 log::debug!("[UpdateIndex] File removed: {}", file_path.display());
324 if super::super::State::UpdateState::RemoveFileFromIndex(index, &file_path).is_ok() {
325 removed += 1;
326 }
327 },
328 _ => {},
329 }
330 }
331
332 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
334
335 Ok(WatcherEventResult { updated, removed })
336}
337
338#[derive(Debug, Clone)]
340pub struct WatcherEventResult {
341 pub updated:u32,
342 pub removed:u32,
343}
344
345pub async fn CleanupRemovedFiles(index:&mut FileIndex) -> Result<u32> {
347 let mut paths_to_remove = Vec::new();
348 let all_paths:Vec<_> = index.files.keys().cloned().collect();
349
350 for path in all_paths {
351 if !path.exists() {
352 paths_to_remove.push(path);
353 }
354 }
355
356 for path in &paths_to_remove {
357 super::super::State::UpdateState::RemoveFileFromIndex(index, path)?;
358 }
359
360 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
362
363 log::debug!("[UpdateIndex] Cleaned up {} removed files", paths_to_remove.len());
364
365 Ok(paths_to_remove.len() as u32)
366}
367
368pub async fn RebuildIndex(
370 index:&mut FileIndex,
371 directories:Vec<String>,
372 patterns:Vec<String>,
373 config:&IndexingConfig,
374) -> Result<UpdateBatchResult> {
375 let start_time = Instant::now();
376
377 index.files.clear();
379 index.content_index.clear();
380 index.symbol_index.clear();
381 index.file_symbols.clear();
382
383 let (files_to_index, scan_result) =
385 crate::Indexing::Scan::ScanDirectory::ScanDirectoriesParallel(directories, patterns, config, 10).await?;
386
387 let semaphore = Arc::new(Semaphore::new(config.MaxParallelIndexing as usize));
389 let index_arc = Arc::new(RwLock::new(index.clone()));
390 let mut tasks = Vec::new();
391
392 for file_path in files_to_index {
393 let permit = semaphore.clone().acquire_owned().await.unwrap();
394 let _index_ref = index_arc.clone();
396 let config_clone = config.clone();
397
398 let task = tokio::spawn(async move {
399 let _permit = permit;
400
401 crate::Indexing::Scan::ScanFile::IndexFileInternal(&file_path, &config_clone, &[]).await
402 });
403
404 tasks.push(task);
405 }
406
407 let mut updated_count = 0u32;
408 let mut total_size = 0u64;
409
410 for task in tasks {
411 match task.await {
412 Ok(Ok((metadata, symbols))) => {
413 let file_size = metadata.size;
414 super::super::State::UpdateState::AddFileToIndex(index, metadata.path.clone(), metadata, symbols)?;
415 updated_count += 1;
416 total_size += file_size;
417 },
418 Ok(Err(e)) => {
419 log::warn!("[UpdateIndex] Rebuild task failed: {}", e);
420 },
421 Err(e) => {
422 log::warn!("[UpdateIndex] Rebuild task join failed: {}", e);
423 },
424 }
425 }
426
427 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
429
430 Ok(UpdateBatchResult {
431 updated_count,
432 removed_count:0,
433 error_count:scan_result.errors,
434 total_size,
435 duration_seconds:start_time.elapsed().as_secs_f64(),
436 })
437}
438
439pub async fn ValidateAndRepairIndex(index:&mut FileIndex) -> Result<RepairResult> {
441 let start_time = Instant::now();
442 let mut repaired_files = 0u32;
443 let removed_orphans;
444
445 match super::super::State::UpdateState::ValidateIndexConsistency(index) {
447 Ok(()) => {},
448 Err(e) => {
449 log::warn!("[UpdateIndex] Index validation failed: {}", e);
450 repaired_files += 1;
451 },
452 }
453
454 removed_orphans = super::super::State::UpdateState::CleanupOrphanedEntries(index)?;
456
457 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
459
460 Ok(RepairResult {
461 repaired_files,
462 removed_orphans,
463 duration_seconds:start_time.elapsed().as_secs_f64(),
464 })
465}
466
467#[derive(Debug, Clone)]
469pub struct RepairResult {
470 pub repaired_files:u32,
471 pub removed_orphans:u32,
472 pub duration_seconds:f64,
473}