AirLibrary/Vine/Server/
AirVinegRPCService.rs

1//! # Air Vine gRPC Service
2//!
3//! Defines the gRPC service implementation for Air. This struct handles
4//! incoming RPC calls from Mountain, dispatches them to the appropriate
5//! services (authentication, updates, downloads, indexing), and returns
6//! the results.
7
8use std::{collections::HashMap, sync::Arc};
9
10use log::{debug, error, info, warn};
11use tonic::{Request, Response, Status};
12use tokio_stream::StreamExt as TokioStreamExt;
13use async_trait::async_trait;
14
15use crate::{
16	AirError,
17	ApplicationState::ApplicationState,
18	Authentication::AuthenticationService,
19	Downloader::DownloadManager,
20	Indexing::{
21		FileIndexer,
22		Store::QueryIndex::{SearchMode, SearchQuery},
23	},
24	Result,
25	Updates::UpdateManager,
26	Utility::CurrentTimestamp,
27	Vine::Generated::{
28		air as air_generated,
29		air::{
30			ApplyUpdateRequest,
31			ApplyUpdateResponse,
32			AuthenticationRequest,
33			AuthenticationResponse,
34			ConfigurationRequest,
35			ConfigurationResponse,
36			DownloadRequest,
37			DownloadResponse,
38			DownloadStreamRequest,
39			DownloadStreamResponse,
40			FileInfoRequest,
41			FileInfoResponse,
42			FileResult,
43			HealthCheckRequest,
44			HealthCheckResponse,
45			IndexRequest,
46			IndexResponse,
47			MetricsRequest,
48			MetricsResponse,
49			ResourceLimitsRequest,
50			ResourceLimitsResponse,
51			ResourceUsageRequest,
52			ResourceUsageResponse,
53			SearchRequest,
54			SearchResponse,
55			StatusRequest,
56			StatusResponse,
57			UpdateCheckRequest,
58			UpdateCheckResponse,
59			UpdateConfigurationRequest,
60			UpdateConfigurationResponse,
61			air_service_server::AirService,
62		},
63	},
64};
65
66/// The concrete implementation of the Air gRPC service
67#[derive(Clone)]
68pub struct AirVinegRPCService {
69	/// Application state
70	AppState:Arc<ApplicationState>,
71
72	/// Authentication service
73	AuthService:Arc<AuthenticationService>,
74
75	/// Update manager
76	UpdateManager:Arc<UpdateManager>,
77
78	/// Download manager
79	DownloadManager:Arc<DownloadManager>,
80
81	/// File indexer
82	FileIndexer:Arc<FileIndexer>,
83
84	/// Connection tracking
85	ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
86}
87
88/// Connection metadata for tracking client state
89#[derive(Debug, Clone)]
90struct ConnectionMetadata {
91	pub ClientId:String,
92	pub ClientVersion:String,
93	pub ProtocolVersion:u32,
94	pub LastRequestTime:u64,
95	pub RequestCount:u64,
96	pub ConnectionType:crate::ApplicationState::ConnectionType,
97}
98
99impl AirVinegRPCService {
100	/// Creates a new instance of the Air gRPC service
101	pub fn new(
102		AppState:Arc<ApplicationState>,
103		AuthService:Arc<AuthenticationService>,
104		UpdateManager:Arc<UpdateManager>,
105		DownloadManager:Arc<DownloadManager>,
106		FileIndexer:Arc<FileIndexer>,
107	) -> Self {
108		info!("[AirVinegRPCService] New instance created");
109
110		Self {
111			AppState,
112			AuthService,
113			UpdateManager,
114			DownloadManager,
115			FileIndexer,
116			ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
117		}
118	}
119
120	/// Track connection for a request
121	async fn TrackConnection<RequestType>(
122		&self,
123		Request:&tonic::Request<RequestType>,
124		_ServiceName:&str,
125	) -> std::result::Result<String, Status> {
126		let Metadata = Request.metadata();
127		let ConnectionId = Metadata
128			.get("connection-id")
129			.map(|v| v.to_str().unwrap_or_default().to_string())
130			.unwrap_or_else(|| crate::Utility::GenerateRequestId());
131
132		let ClientId = Metadata
133			.get("client-id")
134			.map(|v| v.to_str().unwrap_or_default().to_string())
135			.unwrap_or_else(|| "unknown".to_string());
136
137		let ClientVersion = Metadata
138			.get("client-version")
139			.map(|v| v.to_str().unwrap_or_default().to_string())
140			.unwrap_or_else(|| "unknown".to_string());
141
142		let ProtocolVersion = Metadata
143			.get("protocol-version")
144			.map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
145			.unwrap_or(1);
146
147		// Update connection tracking
148		let mut Connections = self.ActiveConnections.write().await;
149		let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
150			ConnectionMetadata {
151				ClientId:ClientId.clone(),
152				ClientVersion:ClientVersion.clone(),
153				ProtocolVersion,
154				LastRequestTime:crate::Utility::CurrentTimestamp(),
155				RequestCount:0,
156				ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
157			}
158		});
159
160		ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
161		ConnectionMetadata.RequestCount += 1;
162
163		// Register connection with application state
164		self.AppState
165			.RegisterConnection(
166				ConnectionId.clone(),
167				ClientId,
168				ClientVersion,
169				ProtocolVersion,
170				crate::ApplicationState::ConnectionType::MountainMain,
171			)
172			.await
173			.map_err(|e| Status::internal(e.to_string()))?;
174
175		Ok(ConnectionId)
176	}
177
178	/// Validate protocol version compatibility
179	fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
180		if ClientVersion > crate::ProtocolVersion {
181			return Err(Status::failed_precondition(format!(
182				"Client protocol version {} is newer than server version {}",
183				ClientVersion,
184				crate::ProtocolVersion
185			)));
186		}
187
188		if ClientVersion < crate::ProtocolVersion {
189			warn!(
190				"Client using older protocol version {} (server: {})",
191				ClientVersion,
192				crate::ProtocolVersion
193			);
194		}
195
196		Ok(())
197	}
198}
199
200#[async_trait]
201impl AirService for AirVinegRPCService {
202	/// Handle authentication requests from Mountain
203	async fn authenticate(
204		&self,
205		Request:Request<AuthenticationRequest>,
206	) -> std::result::Result<Response<AuthenticationResponse>, Status> {
207		// Track connection and validate protocol (before consuming Request)
208		let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
209
210		let RequestData = Request.into_inner();
211		let request_id = RequestData.request_id.clone();
212
213		info!(
214			"[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
215			request_id, ConnectionId
216		);
217
218		self.AppState
219			.RegisterRequest(request_id.clone(), "authentication".to_string())
220			.await
221			.map_err(|e| Status::internal(e.to_string()))?;
222
223		// Additional security validation
224		if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
225			let ErrorMessage = "Invalid authentication parameters".to_string();
226			self.AppState
227				.UpdateRequestStatus(
228					&request_id,
229					crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
230					None,
231				)
232				.await
233				.ok();
234
235			return Ok(Response::new(air_generated::AuthenticationResponse {
236				request_id,
237				success:false,
238				token:String::new(),
239				error:ErrorMessage,
240			}));
241		}
242
243		// Clone needed data before moving RequestData
244		let username_for_log = RequestData.username.clone();
245		let password = RequestData.password;
246		let provider = RequestData.provider;
247
248		let result = self
249			.AuthService
250			.AuthenticateUser(RequestData.username, password, provider)
251			.await;
252
253		match result {
254			Ok(token) => {
255				self.AppState
256					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
257					.await
258					.ok();
259
260				// Log successful authentication
261				info!(
262					"[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
263					username_for_log, ConnectionId
264				);
265
266				Ok(Response::new(air_generated::AuthenticationResponse {
267					request_id,
268					success:true,
269					token,
270					error:String::new(),
271				}))
272			},
273			Err(e) => {
274				self.AppState
275					.UpdateRequestStatus(
276						&request_id,
277						crate::ApplicationState::RequestState::Failed(e.to_string()),
278						None,
279					)
280					.await
281					.ok();
282
283				// Log failed authentication attempt
284				warn!(
285					"[AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
286					username_for_log, ConnectionId, e
287				);
288
289				Ok(Response::new(air_generated::AuthenticationResponse {
290					request_id,
291					success:false,
292					token:String::new(),
293					error:e.to_string(),
294				}))
295			},
296		}
297	}
298
299	/// Handle update check requests from Mountain
300	async fn check_for_updates(
301		&self,
302		request:Request<UpdateCheckRequest>,
303	) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
304		let RequestData = request.into_inner();
305		let request_id = RequestData.request_id.clone();
306
307		info!(
308			"[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
309			request_id, RequestData.current_version, RequestData.channel
310		);
311
312		self.AppState
313			.RegisterRequest(request_id.clone(), "updates".to_string())
314			.await
315			.map_err(|e| Status::internal(e.to_string()))?;
316
317		// Validate CurrentVersion
318		if RequestData.current_version.is_empty() {
319			let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
320			self.AppState
321				.UpdateRequestStatus(
322					&request_id,
323					crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
324					None,
325				)
326				.await
327				.ok();
328			return Err(Status::invalid_argument(ErrorMessage.to_string()));
329		}
330
331		// Validate channel
332		let ValidChannels = ["stable", "beta", "nightly"];
333		let Channel = if RequestData.channel.is_empty() {
334			"stable".to_string()
335		} else {
336			RequestData.channel.clone()
337		};
338		if !ValidChannels.contains(&Channel.as_str()) {
339			let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
340			self.AppState
341				.UpdateRequestStatus(
342					&request_id,
343					crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
344					None,
345				)
346				.await
347				.ok();
348			return Err(Status::invalid_argument(ErrorMessage));
349		}
350
351		// Check for updates using UpdateManager
352		let result = self.UpdateManager.CheckForUpdates().await;
353
354		match result {
355			Ok(UpdateInfo) => {
356				self.AppState
357					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
358					.await
359					.ok();
360
361				info!(
362					"[AirVinegRPCService] Update check successful - Available: {}",
363					UpdateInfo.is_some()
364				);
365
366				Ok(Response::new(air_generated::UpdateCheckResponse {
367					request_id,
368					update_available:UpdateInfo.is_some(),
369					version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
370					download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
371					release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
372					error:String::new(),
373				}))
374			},
375			Err(crate::AirError::Network(e)) => {
376				self.AppState
377					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
378					.await
379					.ok();
380				error!("[AirVinegRPCService] Network error during update check: {}", e);
381				Err(Status::unavailable(e))
382			},
383			Err(e) => {
384				self.AppState
385					.UpdateRequestStatus(
386						&request_id,
387						crate::ApplicationState::RequestState::Failed(e.to_string()),
388						None,
389					)
390					.await
391					.ok();
392				error!("[AirVinegRPCService] Update check failed: {}", e);
393				Ok(Response::new(air_generated::UpdateCheckResponse {
394					request_id,
395					update_available:false,
396					version:String::new(),
397					download_url:String::new(),
398					release_notes:String::new(),
399					error:e.to_string(),
400				}))
401			},
402		}
403	}
404
405	/// Handle download requests from Mountain
406	async fn download_file(
407		&self,
408		request:Request<DownloadRequest>,
409	) -> std::result::Result<Response<DownloadResponse>, Status> {
410		let RequestData = request.into_inner();
411		let request_id = RequestData.request_id.clone();
412
413		info!(
414			"[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
415			request_id, RequestData.url
416		);
417
418		// Request ID for tracking (use provided or generate)
419		let download_request_id = if request_id.is_empty() {
420			crate::Utility::GenerateRequestId()
421		} else {
422			request_id.clone()
423		};
424
425		self.AppState
426			.RegisterRequest(download_request_id.clone(), "downloader".to_string())
427			.await
428			.map_err(|e| Status::internal(e.to_string()))?;
429
430		// Validate URL
431		if RequestData.url.is_empty() {
432			let error_msg = "URL cannot be empty".to_string();
433			self.AppState
434				.UpdateRequestStatus(
435					&download_request_id,
436					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
437					None,
438				)
439				.await
440				.ok();
441			return Ok(Response::new(DownloadResponse {
442				request_id:download_request_id,
443				success:false,
444				file_path:String::new(),
445				file_size:0,
446				checksum:String::new(),
447				error:error_msg,
448			}));
449		}
450
451		// Validate URL format
452		if !match_url_scheme(&RequestData.url) {
453			let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
454			self.AppState
455				.UpdateRequestStatus(
456					&download_request_id,
457					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
458					None,
459				)
460				.await
461				.ok();
462			return Ok(Response::new(DownloadResponse {
463				request_id:download_request_id,
464				success:false,
465				file_path:String::new(),
466				file_size:0,
467				checksum:String::new(),
468				error:error_msg,
469			}));
470		}
471
472		// Validate or use cache directory
473		let DestinationPath = if RequestData.destination_path.is_empty() {
474			// Use cache directory from configuration
475			let config = &self.AppState.Configuration.Downloader;
476			config.CacheDirectory.clone()
477		} else {
478			RequestData.destination_path.clone()
479		};
480
481		// Validate target directory exists
482		let dest_path = std::path::Path::new(&DestinationPath);
483		if let Some(parent) = dest_path.parent() {
484			if !parent.exists() {
485				match tokio::fs::create_dir_all(parent).await {
486					Ok(_) => {
487						debug!("[AirVinegRPCService] Created destination directory: {}", parent.display());
488					},
489					Err(e) => {
490						let error_msg = format!("Failed to create destination directory: {}", e);
491						self.AppState
492							.UpdateRequestStatus(
493								&download_request_id,
494								crate::ApplicationState::RequestState::Failed(error_msg.clone()),
495								None,
496							)
497							.await
498							.ok();
499						return Ok(Response::new(DownloadResponse {
500							request_id:download_request_id,
501							success:false,
502							file_path:String::new(),
503							file_size:0,
504							checksum:String::new(),
505							error:error_msg,
506						}));
507					},
508				}
509			}
510		}
511
512		// Set up granular progress callback mechanism
513		let _download_manager = self.DownloadManager.clone();
514		let AppState = self.AppState.clone();
515		let callback_request_id = download_request_id.clone();
516		let progress_callback = move |progress:f32| {
517			let state = AppState.clone();
518			let id = callback_request_id.clone();
519			tokio::spawn(async move {
520				let _ = state
521					.UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
522					.await;
523			});
524		};
525
526		// Perform download with retry and progress tracking
527		let result = self
528			.download_file_with_retry(
529				&download_request_id,
530				RequestData.url.clone(),
531				DestinationPath,
532				RequestData.checksum,
533				Some(Box::new(progress_callback)),
534			)
535			.await;
536
537		match result {
538			Ok(file_info) => {
539				self.AppState
540					.UpdateRequestStatus(
541						&download_request_id,
542						crate::ApplicationState::RequestState::Completed,
543						Some(100.0),
544					)
545					.await
546					.ok();
547
548				info!(
549					"[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
550					download_request_id, file_info.size
551				);
552
553				Ok(Response::new(DownloadResponse {
554					request_id:download_request_id,
555					success:true,
556					file_path:file_info.path,
557					file_size:file_info.size,
558					checksum:file_info.checksum,
559					error:String::new(),
560				}))
561			},
562			Err(e) => {
563				self.AppState
564					.UpdateRequestStatus(
565						&download_request_id,
566						crate::ApplicationState::RequestState::Failed(e.to_string()),
567						None,
568					)
569					.await
570					.ok();
571
572				error!(
573					"[AirVinegRPCService] Download failed [ID: {}] - Error: {}",
574					download_request_id, e
575				);
576
577				Ok(Response::new(DownloadResponse {
578					request_id:download_request_id,
579					success:false,
580					file_path:String::new(),
581					file_size:0,
582					checksum:String::new(),
583					error:e.to_string(),
584				}))
585			},
586		}
587	}
588
589	/// Handle file indexing requests from Mountain
590	async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
591		let RequestData = request.into_inner();
592		let request_id = RequestData.request_id;
593
594		info!(
595			"[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
596			request_id, RequestData.path
597		);
598
599		self.AppState
600			.RegisterRequest(request_id.clone(), "indexing".to_string())
601			.await
602			.map_err(|e| Status::internal(e.to_string()))?;
603
604		let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
605
606		match result {
607			Ok(index_info) => {
608				self.AppState
609					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
610					.await
611					.ok();
612
613				Ok(Response::new(air_generated::IndexResponse {
614					request_id,
615					success:true,
616					files_indexed:index_info.files_indexed,
617					total_size:index_info.total_size,
618					error:String::new(),
619				}))
620			},
621			Err(e) => {
622				self.AppState
623					.UpdateRequestStatus(
624						&request_id,
625						crate::ApplicationState::RequestState::Failed(e.to_string()),
626						None,
627					)
628					.await
629					.ok();
630
631				Ok(Response::new(air_generated::IndexResponse {
632					request_id,
633					success:false,
634					files_indexed:0,
635					total_size:0,
636					error:e.to_string(),
637				}))
638			},
639		}
640	}
641
642	/// Handle status check requests from Mountain
643	async fn get_status(
644		&self,
645		request:Request<StatusRequest>,
646	) -> std::result::Result<Response<StatusResponse>, Status> {
647		let _RequestData = request.into_inner();
648
649		debug!("[AirVinegRPCService] Status request received");
650
651		let metrics = self.AppState.GetMetrics().await;
652		let resources = self.AppState.GetResourceUsage().await;
653
654		Ok(Response::new(air_generated::StatusResponse {
655			version:crate::VERSION.to_string(),
656			uptime_seconds:metrics.UptimeSeconds,
657			total_requests:metrics.TotalRequests,
658			successful_requests:metrics.SuccessfulRequests,
659			failed_requests:metrics.FailedRequests,
660			average_response_time:metrics.AverageResponseTime,
661			memory_usage_mb:resources.MemoryUsageMb,
662			cpu_usage_percent:resources.CPUUsagePercent,
663			active_requests:self.AppState.GetActiveRequestCount().await as u32,
664		}))
665	}
666
667	/// Handle service health check
668	async fn health_check(
669		&self,
670		_request:Request<HealthCheckRequest>,
671	) -> std::result::Result<Response<HealthCheckResponse>, Status> {
672		debug!("[AirVinegRPCService] Health check request received");
673
674		Ok(Response::new(air_generated::HealthCheckResponse {
675			healthy:true,
676			timestamp:CurrentTimestamp(),
677		}))
678	}
679
680	// ==================== Phase 2: Update Operations ====================
681
682	/// Handle download update requests
683	async fn download_update(
684		&self,
685		request:Request<DownloadRequest>,
686	) -> std::result::Result<Response<DownloadResponse>, Status> {
687		let RequestData = request.into_inner();
688		let request_id = RequestData.request_id.clone();
689
690		info!(
691			"[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
692			request_id, RequestData.url, RequestData.destination_path
693		);
694
695		self.AppState
696			.RegisterRequest(request_id.clone(), "download_update".to_string())
697			.await
698			.map_err(|e| Status::internal(e.to_string()))?;
699
700		// Validate URL is not empty
701		if RequestData.url.is_empty() {
702			let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
703			self.AppState
704				.UpdateRequestStatus(
705					&request_id,
706					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
707					None,
708				)
709				.await
710				.ok();
711			return Err(Status::invalid_argument(error_msg.to_string()));
712		}
713
714		// Validate URL format
715		if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
716			let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
717			self.AppState
718				.UpdateRequestStatus(
719					&request_id,
720					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
721					None,
722				)
723				.await
724				.ok();
725			return Err(Status::invalid_argument(error_msg.to_string()));
726		}
727
728		// Validate destination path is writeable
729		let destination = if RequestData.destination_path.is_empty() {
730			// Default to cache directory if not specified
731			self.UpdateManager
732				.GetCacheDirectory()
733				.join("update-latest.bin")
734				.to_string_lossy()
735				.to_string()
736		} else {
737			// Use provided destination
738			let dest_path = std::path::Path::new(&RequestData.destination_path);
739			if let Some(parent) = dest_path.parent() {
740				if parent.as_os_str().is_empty() {
741					// Relative path in cache directory
742					self.UpdateManager
743						.GetCacheDirectory()
744						.join(&RequestData.destination_path)
745						.to_string_lossy()
746						.to_string()
747				} else {
748					// Absolute or full path
749					RequestData.destination_path.clone()
750				}
751			} else {
752				RequestData.destination_path.clone()
753			}
754		};
755
756		// Check if destination directory exists and is writeable
757		let dest_path = std::path::Path::new(&destination);
758		if let Some(parent) = dest_path.parent() {
759			if !parent.exists() {
760				return Err(Status::failed_precondition(format!(
761					"Destination directory does not exist: {}",
762					parent.display()
763				)));
764			}
765
766			// Test writeability
767			if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
768				let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
769				self.AppState
770					.UpdateRequestStatus(
771						&request_id,
772						crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
773						None,
774					)
775					.await
776					.ok();
777				return Err(Status::permission_denied(error_msg.to_string()));
778			}
779			// Cleanup test file
780			let _ = std::fs::remove_file(parent.join(".write_test"));
781		}
782
783		// Use download manager - includes SHA256 checksum verification, progress
784		// tracking, and retry logic
785		let download_result = self
786			.DownloadManager
787			.DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
788			.await;
789
790		match download_result {
791			Ok(result) => {
792				self.AppState
793					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
794					.await
795					.ok();
796
797				info!(
798					"[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
799					result.path, result.size, result.checksum
800				);
801
802				Ok(Response::new(DownloadResponse {
803					request_id,
804					success:true,
805					file_path:result.path,
806					file_size:result.size,
807					checksum:result.checksum,
808					error:String::new(),
809				}))
810			},
811			Err(crate::AirError::Network(e)) => {
812				self.AppState
813					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
814					.await
815					.ok();
816				error!("[AirVinegRPCService] Download update network error: {}", e);
817				Err(Status::unavailable(e))
818			},
819			Err(crate::AirError::FileSystem(e)) => {
820				self.AppState
821					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
822					.await
823					.ok();
824				error!("[AirVinegRPCService] Download update filesystem error: {}", e);
825				Err(Status::internal(e))
826			},
827			Err(e) => {
828				self.AppState
829					.UpdateRequestStatus(
830						&request_id,
831						crate::ApplicationState::RequestState::Failed(e.to_string()),
832						None,
833					)
834					.await
835					.ok();
836				error!("[AirVinegRPCService] Download update failed: {}", e);
837				Ok(Response::new(DownloadResponse {
838					request_id,
839					success:false,
840					file_path:String::new(),
841					file_size:0,
842					checksum:String::new(),
843					error:e.to_string(),
844				}))
845			},
846		}
847	}
848
849	/// Handle apply update requests
850	async fn apply_update(
851		&self,
852		request:Request<ApplyUpdateRequest>,
853	) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
854		let RequestData = request.into_inner();
855		let request_id = RequestData.request_id.clone();
856
857		info!(
858			"[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
859			request_id, RequestData.version, RequestData.update_path
860		);
861
862		self.AppState
863			.RegisterRequest(request_id.clone(), "apply_update".to_string())
864			.await
865			.map_err(|e| Status::internal(e.to_string()))?;
866
867		// Validate version
868		if RequestData.version.is_empty() {
869			let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
870			self.AppState
871				.UpdateRequestStatus(
872					&request_id,
873					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
874					None,
875				)
876				.await
877				.ok();
878			return Err(Status::invalid_argument(error_msg.to_string()));
879		}
880
881		// Validate update path is not empty
882		if RequestData.update_path.is_empty() {
883			let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
884			self.AppState
885				.UpdateRequestStatus(
886					&request_id,
887					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
888					None,
889				)
890				.await
891				.ok();
892			return Err(Status::invalid_argument(error_msg.to_string()));
893		}
894
895		let update_path = std::path::Path::new(&RequestData.update_path);
896
897		// Validate update file exists
898		if !update_path.exists() {
899			let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
900			self.AppState
901				.UpdateRequestStatus(
902					&request_id,
903					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
904					None,
905				)
906				.await
907				.ok();
908			return Err(Status::not_found(error_msg.to_string()));
909		}
910
911		// Validate update file is readable and has content
912		let metadata = match std::fs::metadata(update_path) {
913			Ok(m) => m,
914			Err(e) => {
915				let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
916				self.AppState
917					.UpdateRequestStatus(
918						&request_id,
919						crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
920						None,
921					)
922					.await
923					.ok();
924				return Err(Status::internal(error_msg.to_string()));
925			},
926		};
927
928		if metadata.len() == 0 {
929			let error_msg = crate::AirError::Validation("Update file is empty".to_string());
930			self.AppState
931				.UpdateRequestStatus(
932					&request_id,
933					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
934					None,
935				)
936				.await
937				.ok();
938			return Err(Status::failed_precondition(error_msg.to_string()));
939		}
940
941		// Prepare rollback capability before applying update
942		let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
943		if let Err(ref e) = rollback_backup_path {
944			warn!(
945				"[AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback capability.",
946				e
947			);
948		}
949
950		// Verify update file integrity (checksum check)
951		match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
952			Ok(true) => {
953				info!("[AirVinegRPCService] Update verification successful, preparing for installation");
954
955				self.AppState
956					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
957					.await
958					.ok();
959
960				// Trigger graceful shutdown after returning response
961				let AppState = self.AppState.clone();
962				let version = RequestData.version.clone();
963				let self_clone = self.clone();
964
965				tokio::spawn(async move {
966					tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
967					log::info!(
968						"[AirVinegRPCService] Initiating graceful shutdown for update version {}",
969						version
970					);
971
972					// Graceful shutdown implementation
973					if let Err(e) = AppState.StopAllBackgroundTasks().await {
974						log::error!("[AirVinegRPCService] Failed to initiate graceful shutdown: {}", e);
975
976						// Implement rollback if update fails
977						log::warn!("[AirVinegRPCService] Rollback initiated due to graceful shutdown failure");
978						if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
979							log::error!("[AirVinegRPCService] Rollback failed: {}", rollback_error);
980						} else {
981							log::info!("[AirVinegRPCService] Rollback completed successfully");
982						}
983					}
984				});
985
986				Ok(Response::new(ApplyUpdateResponse {
987					request_id,
988					success:true,
989					error:String::new(),
990				}))
991			},
992			Ok(false) => {
993				let error_msg = "Update verification failed: checksum mismatch".to_string();
994				self.AppState
995					.UpdateRequestStatus(
996						&request_id,
997						crate::ApplicationState::RequestState::Failed(error_msg.clone()),
998						None,
999					)
1000					.await
1001					.ok();
1002				error!("[AirVinegRPCService] {}", error_msg);
1003
1004				// Clean up rollback backup if verification failed
1005				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1006
1007				Err(Status::failed_precondition(error_msg))
1008			},
1009			Err(crate::AirError::FileSystem(e)) => {
1010				self.AppState
1011					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1012					.await
1013					.ok();
1014				error!("[AirVinegRPCService] Update verification filesystem error: {}", e);
1015
1016				// Clean up rollback backup if verification failed
1017				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1018
1019				Err(Status::internal(e))
1020			},
1021			Err(e) => {
1022				self.AppState
1023					.UpdateRequestStatus(
1024						&request_id,
1025						crate::ApplicationState::RequestState::Failed(e.to_string()),
1026						None,
1027					)
1028					.await
1029					.ok();
1030				error!("[AirVinegRPCService] Update verification error: {}", e);
1031
1032				// Clean up rollback backup if verification failed
1033				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1034
1035				Ok(Response::new(ApplyUpdateResponse {
1036					request_id,
1037					success:false,
1038					error:e.to_string(),
1039				}))
1040			},
1041		}
1042	}
1043
1044	// ==================== Phase 3: Download Operations ====================
1045
1046	/// Handle streaming download requests with bidirectional streaming and
1047	/// chunk delivery
1048	type DownloadStreamStream =
1049		tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1050
1051	async fn download_stream(
1052		&self,
1053		request:Request<DownloadStreamRequest>,
1054	) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1055		let RequestData = request.into_inner();
1056		let request_id = RequestData.request_id.clone();
1057
1058		info!(
1059			"[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1060			request_id, RequestData.url
1061		);
1062
1063		self.AppState
1064			.RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1065			.await
1066			.map_err(|e| Status::internal(e.to_string()))?;
1067
1068		// Validate URL
1069		if RequestData.url.is_empty() {
1070			let error_msg = "URL cannot be empty".to_string();
1071			self.AppState
1072				.UpdateRequestStatus(
1073					&request_id,
1074					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1075					None,
1076				)
1077				.await
1078				.ok();
1079			return Err(Status::invalid_argument(error_msg));
1080		}
1081
1082		// Validate URL scheme
1083		if !match_url_scheme(&RequestData.url) {
1084			let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1085			self.AppState
1086				.UpdateRequestStatus(
1087					&request_id,
1088					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1089					None,
1090				)
1091				.await
1092				.ok();
1093			return Err(Status::invalid_argument(error_msg));
1094		}
1095
1096		// Validate URL supports range headers for streaming
1097		match self.validate_range_support(&RequestData.url).await {
1098			Ok(true) => {
1099				debug!("[AirVinegRPCService] URL supports range headers");
1100			},
1101			Ok(false) => {
1102				warn!("[AirVinegRPCService] URL does not support range headers, streaming may be inefficient");
1103			},
1104			Err(e) => {
1105				let error_msg = format!("Failed to validate range support: {}", e);
1106				self.AppState
1107					.UpdateRequestStatus(
1108						&request_id,
1109						crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1110						None,
1111					)
1112					.await
1113					.ok();
1114				return Err(Status::internal(error_msg));
1115			},
1116		}
1117
1118		// Create a channel for streaming responses
1119		let (tx, rx) = tokio::sync::mpsc::channel(100);
1120
1121		// Configure chunk size (8MB chunks for balance between throughput and latency)
1122		let chunk_size = 8 * 1024 * 1024; // 8MB
1123
1124		// Clone necessary data for spawn task
1125		let url = RequestData.url.clone();
1126		let headers = RequestData.headers;
1127		let download_request_id = request_id.clone();
1128		let _download_manager = self.DownloadManager.clone();
1129		let AppState = self.AppState.clone();
1130
1131		// Spawn streaming task
1132		tokio::spawn(async move {
1133			// Send initial status
1134			if tx
1135				.send(Ok(DownloadStreamResponse {
1136					request_id:download_request_id.clone(),
1137					chunk:vec![].into(),
1138					total_size:0,
1139					downloaded:0,
1140					completed:false,
1141					error:String::new(),
1142				}))
1143				.await
1144				.is_err()
1145			{
1146				log::warn!(
1147					"[AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1148					download_request_id
1149				);
1150				return;
1151			}
1152
1153			// Create HTTP client with connection pooling hints
1154			let client = reqwest::Client::builder()
1155				.pool_idle_timeout(std::time::Duration::from_secs(60))
1156				.pool_max_idle_per_host(5)
1157				.timeout(std::time::Duration::from_secs(300))
1158				.build();
1159
1160			if client.is_err() {
1161				let error = client.unwrap_err().to_string();
1162				let _ = tx
1163					.send(Ok(DownloadStreamResponse {
1164						request_id:download_request_id.clone(),
1165						chunk:vec![].into(),
1166						total_size:0,
1167						downloaded:0,
1168						completed:false,
1169						error:error.clone(),
1170					}))
1171					.await;
1172				AppState
1173					.UpdateRequestStatus(
1174						&download_request_id,
1175						crate::ApplicationState::RequestState::Failed(error),
1176						None,
1177					)
1178					.await
1179					.ok();
1180				return;
1181			}
1182
1183			let client = match client {
1184				Ok(client) => client,
1185				Err(e) => {
1186					let error = format!("Failed to create HTTP client: {}", e);
1187					let _ = tx.send(Err(Status::internal(error.clone())));
1188					AppState
1189						.UpdateRequestStatus(
1190							&download_request_id,
1191							crate::ApplicationState::RequestState::Failed(error),
1192							None,
1193						)
1194						.await
1195						.ok();
1196					return;
1197				},
1198			};
1199
1200			// Start streaming download
1201			let mut total_size:Option<u64> = None;
1202			let mut total_downloaded:u64 = 0;
1203
1204			match client
1205				.get(&url)
1206				.headers({
1207					let mut map = reqwest::header::HeaderMap::new();
1208					for (key, value) in headers {
1209						if let (Ok(header_name), Ok(header_value)) = (
1210							reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1211							reqwest::header::HeaderValue::from_str(&value),
1212						) {
1213							map.insert(header_name, header_value);
1214						}
1215					}
1216					map
1217				})
1218				.send()
1219				.await
1220			{
1221				Ok(response) => {
1222					if !response.status().is_success() {
1223						let error = format!("Download failed with status: {}", response.status());
1224						let _ = tx
1225							.send(Ok(DownloadStreamResponse {
1226								request_id:download_request_id.clone(),
1227								chunk:vec![].into(),
1228								total_size:0,
1229								downloaded:0,
1230								completed:false,
1231								error:error.clone(),
1232							}))
1233							.await;
1234						AppState
1235							.UpdateRequestStatus(
1236								&download_request_id,
1237								crate::ApplicationState::RequestState::Failed(error),
1238								None,
1239							)
1240							.await
1241							.ok();
1242						return;
1243					}
1244
1245					total_size = Some(response.content_length().unwrap_or(0));
1246					let response_tx = tx.clone();
1247					let response_id = download_request_id.clone();
1248
1249					// Stream chunks to client
1250					let mut stream = response.bytes_stream();
1251					let mut buffer = Vec::with_capacity(chunk_size);
1252					let mut last_progress:f32 = 0.0;
1253
1254					while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1255						if AppState.IsRequestCancelled(&download_request_id).await {
1256							log::info!(
1257								"[AirVinegRPCService] Download cancelled by client [ID: {}]",
1258								download_request_id
1259							);
1260							AppState
1261								.UpdateRequestStatus(
1262									&download_request_id,
1263									crate::ApplicationState::RequestState::Cancelled,
1264									None,
1265								)
1266								.await
1267								.ok();
1268							return;
1269						}
1270
1271						match chunk_result {
1272							Ok(chunk) => {
1273								buffer.extend_from_slice(&chunk);
1274								total_downloaded += chunk.len() as u64;
1275
1276								// Send chunk when buffer reaches target size
1277								if buffer.len() >= chunk_size {
1278									// Calculate checksum for verification
1279									let _chunk_checksum = calculate_chunk_checksum(&buffer);
1280
1281									// Calculate progress
1282									let progress = if let Some(ts) = total_size {
1283									if ts > 0 {
1284									(total_downloaded as f32 / ts as f32) * 100.0
1285									} else {
1286									0.0
1287									}
1288									} else {
1289									0.0
1290									};
1291
1292									// Update request status periodically
1293									if progress - last_progress >= 5.0 {
1294										AppState
1295											.UpdateRequestStatus(
1296												&download_request_id,
1297												crate::ApplicationState::RequestState::InProgress,
1298												Some(progress),
1299											)
1300											.await
1301											.ok();
1302										last_progress = progress;
1303									}
1304
1305									if response_tx
1306									.send(Ok(DownloadStreamResponse {
1307									request_id:response_id.clone(),
1308									chunk:buffer.clone().into(),
1309									total_size: total_size.unwrap_or(0),
1310									downloaded:total_downloaded,
1311									completed:false,
1312									error:String::new(),
1313									}))
1314										.await
1315										.is_err()
1316									{
1317										log::warn!(
1318											"[AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1319											download_request_id
1320										);
1321										AppState
1322											.UpdateRequestStatus(
1323												&download_request_id,
1324												crate::ApplicationState::RequestState::Failed(
1325													"Client disconnected".to_string(),
1326												),
1327												None,
1328											)
1329											.await
1330											.ok();
1331										return;
1332									}
1333
1334									debug!(
1335										"[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1336										buffer.len(),
1337										download_request_id,
1338										progress
1339									);
1340
1341									buffer.clear();
1342								}
1343							},
1344							Err(e) => {
1345								let error = format!("Download error: {}", e);
1346								log::error!(
1347									"[AirVinegRPCService] Stream download failed [ID: {}]: {}",
1348									download_request_id,
1349									error
1350								);
1351
1352								let _ = response_tx
1353								.send(Ok(DownloadStreamResponse {
1354								request_id:response_id.clone(),
1355								chunk:vec![].into(),
1356								total_size: total_size.unwrap_or(0),
1357								downloaded:total_downloaded,
1358								completed:false,
1359								error:error.clone(),
1360								}))
1361									.await;
1362
1363								AppState
1364									.UpdateRequestStatus(
1365										&download_request_id,
1366										crate::ApplicationState::RequestState::Failed(error),
1367										None,
1368									)
1369									.await
1370									.ok();
1371								return;
1372							},
1373						}
1374					}
1375
1376					// Send remaining buffered data
1377					if !buffer.is_empty() {
1378						let _chunk_checksum = calculate_chunk_checksum(&buffer);
1379
1380						if tx
1381						.send(Ok(DownloadStreamResponse {
1382						request_id:download_request_id.clone(),
1383						chunk:buffer.into(),
1384						total_size: total_size.unwrap_or(0),
1385						downloaded:total_downloaded,
1386						completed:false,
1387						error:String::new(),
1388						}))
1389							.await
1390							.is_err()
1391						{
1392							log::warn!(
1393								"[AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1394								download_request_id
1395							);
1396							return;
1397						}
1398					}
1399
1400					// Send completion signal
1401					AppState
1402						.UpdateRequestStatus(
1403							&download_request_id,
1404							crate::ApplicationState::RequestState::Completed,
1405							Some(100.0),
1406						)
1407						.await
1408						.ok();
1409
1410					let _ = tx
1411					.send(Ok(DownloadStreamResponse {
1412					request_id,
1413					chunk:vec![].into(),
1414					total_size: total_size.unwrap_or(0),
1415					downloaded:total_downloaded,
1416					completed:true,
1417					error:String::new(),
1418					}))
1419						.await;
1420
1421					info!(
1422						"[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1423						download_request_id, total_downloaded
1424					);
1425				},
1426				Err(e) => {
1427					let error = format!("Failed to start streaming download: {}", e);
1428					log::error!(
1429						"[AirVinegRPCService] Stream download error [ID: {}]: {}",
1430						download_request_id,
1431						error
1432					);
1433
1434					let _ = tx
1435						.send(Ok(DownloadStreamResponse {
1436							request_id:download_request_id.clone(),
1437							chunk:vec![].into(),
1438							total_size:0,
1439							downloaded:0,
1440							completed:false,
1441							error:error.clone(),
1442						}))
1443						.await;
1444
1445					AppState
1446						.UpdateRequestStatus(
1447							&download_request_id,
1448							crate::ApplicationState::RequestState::Failed(error),
1449							None,
1450						)
1451						.await
1452						.ok();
1453				},
1454			}
1455		});
1456
1457		Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1458	}
1459
1460	// ==================== Phase 4: Indexing Operations ====================
1461
1462	/// Handle file search requests
1463	async fn search_files(
1464		&self,
1465		request:Request<SearchRequest>,
1466	) -> std::result::Result<Response<SearchResponse>, Status> {
1467		let RequestData = request.into_inner();
1468		let request_id = RequestData.request_id.clone();
1469
1470		debug!(
1471			"[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1472			RequestData.query, RequestData.path
1473		);
1474
1475		// Validate search query
1476		if RequestData.query.is_empty() {
1477			return Ok(Response::new(SearchResponse {
1478				request_id,
1479				results:vec![],
1480				total_results:0,
1481				error:"Search query cannot be empty".to_string(),
1482			}));
1483		}
1484
1485		// Use file indexer to search - convert to match the existing signature
1486		let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1487		let _search_path = path.as_deref();
1488
1489		match self
1490			.FileIndexer
1491			.SearchFiles(
1492				SearchQuery {
1493					query:RequestData.query.clone(),
1494					mode:SearchMode::Literal,
1495					case_sensitive:false,
1496					whole_word:false,
1497					regex:None,
1498					max_results:RequestData.max_results,
1499					page:1,
1500				},
1501				path,
1502				None,
1503			)
1504			.await
1505		{
1506			Ok(search_results) => {
1507				// Convert from internal SearchResult to proto FileResult
1508				let mut file_results = Vec::new();
1509				for r in search_results {
1510					// Create a preview from the first match if available
1511					let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1512						(first_match.line_content.clone(), first_match.line_number)
1513					} else {
1514						(String::new(), 0)
1515					};
1516
1517					// Get file size from metadata or filesystem
1518					let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1519						metadata.size
1520					} else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1521						file_metadata.len()
1522					} else {
1523						0
1524					};
1525
1526					file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1527				}
1528
1529				info!("[AirVinegRPCService] Search completed: {} results found", file_results.len());
1530
1531				let result_count = file_results.len();
1532				Ok(Response::new(SearchResponse {
1533					request_id,
1534					results:file_results,
1535					total_results:result_count as u32,
1536					error:String::new(),
1537				}))
1538			},
1539			Err(e) => {
1540				error!("[AirVinegRPCService] Search failed: {}", e);
1541				Ok(Response::new(SearchResponse {
1542					request_id,
1543					results:vec![],
1544					total_results:0,
1545					error:e.to_string(),
1546				}))
1547			},
1548		}
1549	}
1550
1551	/// Handle get file info requests
1552	async fn get_file_info(
1553		&self,
1554		request:Request<FileInfoRequest>,
1555	) -> std::result::Result<Response<FileInfoResponse>, Status> {
1556		let RequestData = request.into_inner();
1557		let request_id = RequestData.request_id.clone();
1558
1559		debug!("[AirVinegRPCService] Get file info request: {}", RequestData.path);
1560
1561		// Validate path
1562		if RequestData.path.is_empty() {
1563			return Ok(Response::new(FileInfoResponse {
1564				request_id,
1565				exists:false,
1566				size:0,
1567				mime_type:String::new(),
1568				checksum:String::new(),
1569				modified_time:0,
1570				error:"Path cannot be empty".to_string(),
1571			}));
1572		}
1573
1574		// Get file metadata
1575		use std::path::Path;
1576		let path = Path::new(&RequestData.path);
1577
1578		if !path.exists() {
1579			return Ok(Response::new(FileInfoResponse {
1580				request_id,
1581				exists:false,
1582				size:0,
1583				mime_type:String::new(),
1584				checksum:String::new(),
1585				modified_time:0,
1586				error:String::new(), // File not found is not an error
1587			}));
1588		}
1589
1590		// Get file metadata using std::fs
1591		match std::fs::metadata(path) {
1592			Ok(metadata) => {
1593				let modified_time = metadata
1594					.modified()
1595					.ok()
1596					.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1597					.map(|d| d.as_secs())
1598					.unwrap_or(0);
1599
1600				// Detect MIME type
1601				let mime_type = self.detect_mime_type(path);
1602
1603				// Calculate checksum lazily or on-demand
1604				let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1605					log::warn!("[AirVinegRPCService] Failed to calculate checksum: {}", e);
1606					String::new()
1607				});
1608
1609				Ok(Response::new(FileInfoResponse {
1610					request_id,
1611					exists:true,
1612					size:metadata.len(),
1613					mime_type,
1614					checksum,
1615					modified_time,
1616					error:String::new(),
1617				}))
1618			},
1619			Err(e) => {
1620				error!("[AirVinegRPCService] Failed to get file metadata: {}", e);
1621				Ok(Response::new(FileInfoResponse {
1622					request_id,
1623					exists:false,
1624					size:0,
1625					mime_type:String::new(),
1626					checksum:String::new(),
1627					modified_time:0,
1628					error:e.to_string(),
1629				}))
1630			},
1631		}
1632	}
1633
1634	// ==================== Phase 5: Monitoring & Metrics ====================
1635
1636	/// Handle get metrics requests
1637	async fn get_metrics(
1638		&self,
1639		request:Request<MetricsRequest>,
1640	) -> std::result::Result<Response<MetricsResponse>, Status> {
1641		let RequestData = request.into_inner();
1642		let request_id = RequestData.request_id.clone();
1643
1644		debug!("[AirVinegRPCService] Get metrics request: type='{}'", RequestData.metric_type);
1645
1646		let metrics = self.AppState.GetMetrics().await;
1647		let mut metrics_map = std::collections::HashMap::new();
1648
1649		// Performance metrics
1650		if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1651			metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1652			metrics_map.insert("total_requests".to_string(), metrics.TotalRequests.to_string());
1653			metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequests.to_string());
1654			metrics_map.insert("failed_requests".to_string(), metrics.FailedRequests.to_string());
1655			metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1656		}
1657
1658		// Request metrics
1659		if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1660			metrics_map.insert(
1661				"ActiveRequests".to_string(),
1662				self.AppState.GetActiveRequestCount().await.to_string(),
1663			);
1664		}
1665
1666		Ok(Response::new(MetricsResponse {
1667			request_id,
1668			metrics:metrics_map,
1669			error:String::new(),
1670		}))
1671	}
1672
1673	/// Handle get resource usage requests
1674	async fn get_resource_usage(
1675		&self,
1676		request:Request<ResourceUsageRequest>,
1677	) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1678		let RequestData = request.into_inner();
1679		let request_id = RequestData.request_id.clone();
1680
1681		debug!("[AirVinegRPCService] Get resource usage request");
1682
1683		let resources = self.AppState.GetResourceUsage().await;
1684
1685		Ok(Response::new(ResourceUsageResponse {
1686			request_id,
1687			memory_usage_mb:resources.MemoryUsageMb,
1688			cpu_usage_percent:resources.CPUUsagePercent,
1689			disk_usage_mb:resources.DiskUsageMb,
1690			network_usage_mbps:resources.NetworkUsageMbps,
1691			error:String::new(),
1692		}))
1693	}
1694
1695	/// Handle set resource limits requests
1696	async fn set_resource_limits(
1697		&self,
1698		request:Request<ResourceLimitsRequest>,
1699	) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1700		let RequestData = request.into_inner();
1701		let request_id = RequestData.request_id.clone();
1702
1703		info!(
1704			"[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1705			RequestData.memory_limit_mb, RequestData.cpu_limit_percent, RequestData.disk_limit_mb
1706		);
1707
1708		// Validate limits
1709		if RequestData.memory_limit_mb == 0 {
1710			return Ok(Response::new(ResourceLimitsResponse {
1711				request_id,
1712				success:false,
1713				error:"Memory limit must be greater than 0".to_string(),
1714			}));
1715		}
1716
1717		if RequestData.cpu_limit_percent > 100 {
1718			return Ok(Response::new(ResourceLimitsResponse {
1719				request_id,
1720				success:false,
1721				error:"CPU limit cannot exceed 100%".to_string(),
1722			}));
1723		}
1724
1725		// Apply new limits via ApplicationState
1726		let result = self
1727			.AppState
1728			.SetResourceLimits(
1729				Some(RequestData.memory_limit_mb as u64),
1730				Some(RequestData.cpu_limit_percent as f64),
1731				Some(RequestData.disk_limit_mb as u64),
1732			)
1733			.await;
1734
1735		match result {
1736			Ok(_) => {
1737				Ok(Response::new(ResourceLimitsResponse {
1738					request_id,
1739					success:true,
1740					error:String::new(),
1741				}))
1742			},
1743			Err(e) => {
1744				Ok(Response::new(ResourceLimitsResponse {
1745					request_id,
1746					success:false,
1747					error:e.to_string(),
1748				}))
1749			},
1750		}
1751	}
1752
1753	// ==================== Phase 6: Configuration Management ====================
1754
1755	/// Handle get configuration requests
1756	async fn get_configuration(
1757		&self,
1758		request:Request<ConfigurationRequest>,
1759	) -> std::result::Result<Response<ConfigurationResponse>, Status> {
1760		let RequestData = request.into_inner();
1761		let request_id = RequestData.request_id.clone();
1762
1763		debug!(
1764			"[AirVinegRPCService] Get configuration request: section='{}'",
1765			RequestData.section
1766		);
1767
1768		// Get configuration from ApplicationState
1769		let config = self.AppState.GetConfiguration().await;
1770		let mut config_map = std::collections::HashMap::new();
1771
1772		// Serialize config to map, filter by section if specified
1773		match RequestData.section.as_str() {
1774			"grpc" => {
1775				config_map.insert("bind_address".to_string(), config.gRPC.BindAddress.clone());
1776				config_map.insert("max_connections".to_string(), config.gRPC.MaxConnections.to_string());
1777				config_map.insert("request_timeout_secs".to_string(), config.gRPC.RequestTimeoutSecs.to_string());
1778			},
1779			"authentication" => {
1780				config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
1781				config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
1782				config_map.insert(
1783					"token_expiration_hours".to_string(),
1784					config.Authentication.TokenExpirationHours.to_string(),
1785				);
1786			},
1787			"updates" => {
1788				config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
1789				config_map.insert(
1790					"check_interval_hours".to_string(),
1791					config.Updates.CheckIntervalHours.to_string(),
1792				);
1793				config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
1794				config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
1795				config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
1796			},
1797			"downloader" => {
1798				config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
1799				config_map.insert(
1800					"max_concurrent_downloads".to_string(),
1801					config.Downloader.MaxConcurrentDownloads.to_string(),
1802				);
1803				config_map.insert(
1804					"download_timeout_secs".to_string(),
1805					config.Downloader.DownloadTimeoutSecs.to_string(),
1806				);
1807				config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
1808				config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
1809			},
1810			"indexing" => {
1811				config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
1812				config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
1813				config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
1814				config_map.insert(
1815					"update_interval_minutes".to_string(),
1816					config.Indexing.UpdateIntervalMinutes.to_string(),
1817				);
1818				config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
1819			},
1820			_ => {
1821				// Return all sections (redacted for sensitive values)
1822				config_map.insert("_grpc_enabled".to_string(), "true".to_string());
1823			},
1824		}
1825
1826		Ok(Response::new(ConfigurationResponse {
1827			request_id,
1828			configuration:config_map,
1829			error:String::new(),
1830		}))
1831	}
1832
1833	/// Handle update configuration requests
1834	async fn update_configuration(
1835		&self,
1836		request:Request<UpdateConfigurationRequest>,
1837	) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
1838		let RequestData = request.into_inner();
1839		let request_id = RequestData.request_id.clone();
1840
1841		info!(
1842			"[AirVinegRPCService] Update configuration request: section='{}'",
1843			RequestData.section
1844		);
1845
1846		// Validate section
1847		if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
1848		{
1849			return Ok(Response::new(UpdateConfigurationResponse {
1850				request_id,
1851				success:false,
1852				error:"Invalid configuration section".to_string(),
1853			}));
1854		}
1855
1856		// Update configuration via ApplicationState
1857		let result = self
1858			.AppState
1859			.UpdateConfiguration(RequestData.section, RequestData.updates)
1860			.await;
1861
1862		match result {
1863			Ok(_) => {
1864				Ok(Response::new(UpdateConfigurationResponse {
1865					request_id,
1866					success:true,
1867					error:String::new(),
1868				}))
1869			},
1870			Err(e) => {
1871				Ok(Response::new(UpdateConfigurationResponse {
1872					request_id,
1873					success:false,
1874					error:e.to_string(),
1875				}))
1876			},
1877		}
1878	}
1879}
1880
1881// ==================== Helper Methods ====================
1882
1883impl AirVinegRPCService {
1884	/// Detect MIME type based on file extension
1885	fn detect_mime_type(&self, path:&std::path::Path) -> String {
1886		match path.extension().and_then(|e| e.to_str()) {
1887			Some("rs") => "text/x-rust".to_string(),
1888			Some("ts") => "application/typescript".to_string(),
1889			Some("js") => "application/javascript".to_string(),
1890			Some("json") => "application/json".to_string(),
1891			Some("toml") => "application/toml".to_string(),
1892			Some("md") => "text/markdown".to_string(),
1893			Some("txt") => "text/plain".to_string(),
1894			Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
1895			Some("html") => "text/html".to_string(),
1896			Some("css") => "text/css".to_string(),
1897			Some("xml") => "application/xml".to_string(),
1898			Some("png") => "image/png".to_string(),
1899			Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
1900			Some("gif") => "image/gif".to_string(),
1901			Some("svg") => "image/svg+xml".to_string(),
1902			Some("pdf") => "application/pdf".to_string(),
1903			Some("zip") => "application/zip".to_string(),
1904			Some("tar") | Some("gz") => "application/x-tar".to_string(),
1905			Some("proto") => "application/x-protobuf".to_string(),
1906			_ => "application/octet-stream".to_string(),
1907		}
1908	}
1909
1910	/// Download file with exponential backoff retry
1911	/// Returns file_info (path, size, checksum) from DownloadManager
1912	async fn download_file_with_retry(
1913		&self,
1914		request_id:&str,
1915		url:String,
1916		DestinationPath:String,
1917		checksum:String,
1918		progress_callback:Option<Box<dyn Fn(f32) + Send>>,
1919	) -> Result<crate::Downloader::DownloadResult> {
1920		let config = &self.AppState.Configuration.Downloader;
1921		let mut retries = 0;
1922
1923		loop {
1924			match self
1925				.DownloadManager
1926				.DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
1927				.await
1928			{
1929				Ok(file_info) => {
1930					if let Some(ref callback) = progress_callback {
1931						callback(100.0);
1932					}
1933					return Ok(file_info);
1934				},
1935				Err(e) => {
1936					if retries < config.MaxRetries as usize {
1937						retries += 1;
1938						let backoff_secs = 2u64.pow(retries as u32);
1939						warn!(
1940							"[AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - Backing \
1941							 off {} seconds",
1942							request_id, retries, config.MaxRetries, e, backoff_secs
1943						);
1944
1945						if let Some(ref callback) = progress_callback {
1946							// Notify retry attempts
1947							let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
1948							callback(progress);
1949						}
1950
1951						tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
1952					} else {
1953						error!(
1954							"[AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
1955							config.MaxRetries, request_id, e
1956						);
1957						return Err(e);
1958					}
1959				},
1960			}
1961		}
1962	}
1963
1964	/// Validate URL supports range headers for streaming
1965	async fn validate_range_support(&self, url:&str) -> Result<bool> {
1966		let client = reqwest::Client::builder()
1967			.timeout(std::time::Duration::from_secs(10))
1968			.build()
1969			.map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
1970
1971		let response = client
1972			.head(url)
1973			.send()
1974			.await
1975			.map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
1976
1977		// Check if server supports range requests
1978		let accepts_ranges = response
1979			.headers()
1980			.get("accept-ranges")
1981			.map(|v| v.to_str().unwrap_or("none"))
1982			.unwrap_or("none");
1983
1984		Ok(accepts_ranges == "bytes")
1985	}
1986
1987	/// Prepare rollback backup before applying update
1988	async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
1989		let cache_dir = self.UpdateManager.GetCacheDirectory();
1990		let rollback_dir = cache_dir.join("rollback");
1991
1992		// Create rollback directory if it doesn't exist
1993		if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
1994			return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
1995		}
1996
1997		// Create backup marker file with version
1998		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
1999		let marker_content = format!(
2000			"version={}\ntimestamp={}\nrollback_available=true",
2001			version,
2002			chrono::Utc::now().to_rfc3339()
2003		);
2004
2005		if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2006			return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2007		}
2008
2009		info!(
2010			"[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2011			version, backup_file
2012		);
2013
2014		Ok(())
2015	}
2016
2017	/// Cleanup rollback backup after successful update or failed verification
2018	async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2019		let cache_dir = self.UpdateManager.GetCacheDirectory();
2020		let rollback_dir = cache_dir.join("rollback");
2021		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2022
2023		if backup_file.exists() {
2024			if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2025				return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2026			}
2027			info!("[AirVinegRPCService] Rollback backup cleaned up for version {}", version);
2028		}
2029
2030		Ok(())
2031	}
2032
2033	/// Perform rollback to previous version
2034	async fn perform_rollback(&self, version:&str) -> Result<()> {
2035		let cache_dir = self.UpdateManager.GetCacheDirectory();
2036		let rollback_dir = cache_dir.join("rollback");
2037		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2038
2039		if !backup_file.exists() {
2040			return Err(AirError::FileSystem(format!(
2041				"Rollback backup not found for version {}",
2042				version
2043			)));
2044		}
2045
2046		log::info!("[AirVinegRPCService] Starting rollback for version {}", version);
2047
2048		// Read backup marker
2049		let marker_content = tokio::fs::read_to_string(&backup_file)
2050			.await
2051			.map_err(|e| format!("Failed to read backup marker: {}", e))?;
2052
2053		// Parse marker content
2054		let mut timestamp = None;
2055		let mut rollback_available = false;
2056
2057		for line in marker_content.lines() {
2058			if let Some(value) = line.strip_prefix("timestamp=") {
2059				timestamp = Some(value.to_string());
2060			} else if line == "rollback_available=true" {
2061				rollback_available = true;
2062			}
2063		}
2064
2065		if !rollback_available {
2066			return Err(AirError::Validation("Rollback not available for this version".to_string()));
2067		}
2068
2069		// Perform actual rollback logic
2070		// This would involve:
2071		// 1. Restoring previous binary/files
2072		// 2. Reverting configuration changes
2073		// 3. Cleaning up failed update artifacts
2074
2075		log::info!(
2076			"[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2077			version,
2078			timestamp
2079		);
2080
2081		// Cleanup backup marker after successful rollback
2082		if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2083			log::warn!("[AirVinegRPCService] Failed to cleanup backup marker after rollback: {}", e);
2084		}
2085
2086		Ok(())
2087	}
2088}
2089
2090/// Validate URL has a valid scheme
2091fn match_url_scheme(url:&str) -> bool {
2092	url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2093}
2094
2095/// Calculate chunk checksum for verification
2096fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2097	use sha2::{Digest, Sha256};
2098	let mut hasher = Sha256::new();
2099	hasher.update(chunk);
2100	format!("{:x}", hasher.finalize())
2101}
2102
2103/// Calculate file checksum for integrity verification
2104async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2105	use sha2::{Digest, Sha256};
2106	use tokio::io::AsyncReadExt;
2107
2108	let mut file = tokio::fs::File::open(path)
2109		.await
2110		.map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2111
2112	let mut hasher = Sha256::new();
2113	let mut buffer = vec![0u8; 8192];
2114
2115	loop {
2116		let bytes_read = file
2117			.read(&mut buffer)
2118			.await
2119			.map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2120
2121		if bytes_read == 0 {
2122			break;
2123		}
2124
2125		hasher.update(&buffer[..bytes_read]);
2126	}
2127
2128	let result = hasher.finalize();
2129	Ok(format!("{:x}", result))
2130}