1use std::{collections::HashMap, sync::Arc};
9
10use log::{debug, error, info, warn};
11use tonic::{Request, Response, Status};
12use tokio_stream::StreamExt as TokioStreamExt;
13use async_trait::async_trait;
14
15use crate::{
16 AirError,
17 ApplicationState::ApplicationState,
18 Authentication::AuthenticationService,
19 Downloader::DownloadManager,
20 Indexing::{
21 FileIndexer,
22 Store::QueryIndex::{SearchMode, SearchQuery},
23 },
24 Result,
25 Updates::UpdateManager,
26 Utility::CurrentTimestamp,
27 Vine::Generated::{
28 air as air_generated,
29 air::{
30 ApplyUpdateRequest,
31 ApplyUpdateResponse,
32 AuthenticationRequest,
33 AuthenticationResponse,
34 ConfigurationRequest,
35 ConfigurationResponse,
36 DownloadRequest,
37 DownloadResponse,
38 DownloadStreamRequest,
39 DownloadStreamResponse,
40 FileInfoRequest,
41 FileInfoResponse,
42 FileResult,
43 HealthCheckRequest,
44 HealthCheckResponse,
45 IndexRequest,
46 IndexResponse,
47 MetricsRequest,
48 MetricsResponse,
49 ResourceLimitsRequest,
50 ResourceLimitsResponse,
51 ResourceUsageRequest,
52 ResourceUsageResponse,
53 SearchRequest,
54 SearchResponse,
55 StatusRequest,
56 StatusResponse,
57 UpdateCheckRequest,
58 UpdateCheckResponse,
59 UpdateConfigurationRequest,
60 UpdateConfigurationResponse,
61 air_service_server::AirService,
62 },
63 },
64};
65
66#[derive(Clone)]
68pub struct AirVinegRPCService {
69 AppState:Arc<ApplicationState>,
71
72 AuthService:Arc<AuthenticationService>,
74
75 UpdateManager:Arc<UpdateManager>,
77
78 DownloadManager:Arc<DownloadManager>,
80
81 FileIndexer:Arc<FileIndexer>,
83
84 ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
86}
87
88#[derive(Debug, Clone)]
90struct ConnectionMetadata {
91 pub ClientId:String,
92 pub ClientVersion:String,
93 pub ProtocolVersion:u32,
94 pub LastRequestTime:u64,
95 pub RequestCount:u64,
96 pub ConnectionType:crate::ApplicationState::ConnectionType,
97}
98
99impl AirVinegRPCService {
100 pub fn new(
102 AppState:Arc<ApplicationState>,
103 AuthService:Arc<AuthenticationService>,
104 UpdateManager:Arc<UpdateManager>,
105 DownloadManager:Arc<DownloadManager>,
106 FileIndexer:Arc<FileIndexer>,
107 ) -> Self {
108 info!("[AirVinegRPCService] New instance created");
109
110 Self {
111 AppState,
112 AuthService,
113 UpdateManager,
114 DownloadManager,
115 FileIndexer,
116 ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
117 }
118 }
119
120 async fn TrackConnection<RequestType>(
122 &self,
123 Request:&tonic::Request<RequestType>,
124 _ServiceName:&str,
125 ) -> std::result::Result<String, Status> {
126 let Metadata = Request.metadata();
127 let ConnectionId = Metadata
128 .get("connection-id")
129 .map(|v| v.to_str().unwrap_or_default().to_string())
130 .unwrap_or_else(|| crate::Utility::GenerateRequestId());
131
132 let ClientId = Metadata
133 .get("client-id")
134 .map(|v| v.to_str().unwrap_or_default().to_string())
135 .unwrap_or_else(|| "unknown".to_string());
136
137 let ClientVersion = Metadata
138 .get("client-version")
139 .map(|v| v.to_str().unwrap_or_default().to_string())
140 .unwrap_or_else(|| "unknown".to_string());
141
142 let ProtocolVersion = Metadata
143 .get("protocol-version")
144 .map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
145 .unwrap_or(1);
146
147 let mut Connections = self.ActiveConnections.write().await;
149 let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
150 ConnectionMetadata {
151 ClientId:ClientId.clone(),
152 ClientVersion:ClientVersion.clone(),
153 ProtocolVersion,
154 LastRequestTime:crate::Utility::CurrentTimestamp(),
155 RequestCount:0,
156 ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
157 }
158 });
159
160 ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
161 ConnectionMetadata.RequestCount += 1;
162
163 self.AppState
165 .RegisterConnection(
166 ConnectionId.clone(),
167 ClientId,
168 ClientVersion,
169 ProtocolVersion,
170 crate::ApplicationState::ConnectionType::MountainMain,
171 )
172 .await
173 .map_err(|e| Status::internal(e.to_string()))?;
174
175 Ok(ConnectionId)
176 }
177
178 fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
180 if ClientVersion > crate::ProtocolVersion {
181 return Err(Status::failed_precondition(format!(
182 "Client protocol version {} is newer than server version {}",
183 ClientVersion,
184 crate::ProtocolVersion
185 )));
186 }
187
188 if ClientVersion < crate::ProtocolVersion {
189 warn!(
190 "Client using older protocol version {} (server: {})",
191 ClientVersion,
192 crate::ProtocolVersion
193 );
194 }
195
196 Ok(())
197 }
198}
199
200#[async_trait]
201impl AirService for AirVinegRPCService {
202 async fn authenticate(
204 &self,
205 Request:Request<AuthenticationRequest>,
206 ) -> std::result::Result<Response<AuthenticationResponse>, Status> {
207 let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
209
210 let RequestData = Request.into_inner();
211 let request_id = RequestData.request_id.clone();
212
213 info!(
214 "[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
215 request_id, ConnectionId
216 );
217
218 self.AppState
219 .RegisterRequest(request_id.clone(), "authentication".to_string())
220 .await
221 .map_err(|e| Status::internal(e.to_string()))?;
222
223 if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
225 let ErrorMessage = "Invalid authentication parameters".to_string();
226 self.AppState
227 .UpdateRequestStatus(
228 &request_id,
229 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
230 None,
231 )
232 .await
233 .ok();
234
235 return Ok(Response::new(air_generated::AuthenticationResponse {
236 request_id,
237 success:false,
238 token:String::new(),
239 error:ErrorMessage,
240 }));
241 }
242
243 let username_for_log = RequestData.username.clone();
245 let password = RequestData.password;
246 let provider = RequestData.provider;
247
248 let result = self
249 .AuthService
250 .AuthenticateUser(RequestData.username, password, provider)
251 .await;
252
253 match result {
254 Ok(token) => {
255 self.AppState
256 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
257 .await
258 .ok();
259
260 info!(
262 "[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
263 username_for_log, ConnectionId
264 );
265
266 Ok(Response::new(air_generated::AuthenticationResponse {
267 request_id,
268 success:true,
269 token,
270 error:String::new(),
271 }))
272 },
273 Err(e) => {
274 self.AppState
275 .UpdateRequestStatus(
276 &request_id,
277 crate::ApplicationState::RequestState::Failed(e.to_string()),
278 None,
279 )
280 .await
281 .ok();
282
283 warn!(
285 "[AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
286 username_for_log, ConnectionId, e
287 );
288
289 Ok(Response::new(air_generated::AuthenticationResponse {
290 request_id,
291 success:false,
292 token:String::new(),
293 error:e.to_string(),
294 }))
295 },
296 }
297 }
298
299 async fn check_for_updates(
301 &self,
302 request:Request<UpdateCheckRequest>,
303 ) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
304 let RequestData = request.into_inner();
305 let request_id = RequestData.request_id.clone();
306
307 info!(
308 "[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
309 request_id, RequestData.current_version, RequestData.channel
310 );
311
312 self.AppState
313 .RegisterRequest(request_id.clone(), "updates".to_string())
314 .await
315 .map_err(|e| Status::internal(e.to_string()))?;
316
317 if RequestData.current_version.is_empty() {
319 let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
320 self.AppState
321 .UpdateRequestStatus(
322 &request_id,
323 crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
324 None,
325 )
326 .await
327 .ok();
328 return Err(Status::invalid_argument(ErrorMessage.to_string()));
329 }
330
331 let ValidChannels = ["stable", "beta", "nightly"];
333 let Channel = if RequestData.channel.is_empty() {
334 "stable".to_string()
335 } else {
336 RequestData.channel.clone()
337 };
338 if !ValidChannels.contains(&Channel.as_str()) {
339 let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
340 self.AppState
341 .UpdateRequestStatus(
342 &request_id,
343 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
344 None,
345 )
346 .await
347 .ok();
348 return Err(Status::invalid_argument(ErrorMessage));
349 }
350
351 let result = self.UpdateManager.CheckForUpdates().await;
353
354 match result {
355 Ok(UpdateInfo) => {
356 self.AppState
357 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
358 .await
359 .ok();
360
361 info!(
362 "[AirVinegRPCService] Update check successful - Available: {}",
363 UpdateInfo.is_some()
364 );
365
366 Ok(Response::new(air_generated::UpdateCheckResponse {
367 request_id,
368 update_available:UpdateInfo.is_some(),
369 version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
370 download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
371 release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
372 error:String::new(),
373 }))
374 },
375 Err(crate::AirError::Network(e)) => {
376 self.AppState
377 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
378 .await
379 .ok();
380 error!("[AirVinegRPCService] Network error during update check: {}", e);
381 Err(Status::unavailable(e))
382 },
383 Err(e) => {
384 self.AppState
385 .UpdateRequestStatus(
386 &request_id,
387 crate::ApplicationState::RequestState::Failed(e.to_string()),
388 None,
389 )
390 .await
391 .ok();
392 error!("[AirVinegRPCService] Update check failed: {}", e);
393 Ok(Response::new(air_generated::UpdateCheckResponse {
394 request_id,
395 update_available:false,
396 version:String::new(),
397 download_url:String::new(),
398 release_notes:String::new(),
399 error:e.to_string(),
400 }))
401 },
402 }
403 }
404
405 async fn download_file(
407 &self,
408 request:Request<DownloadRequest>,
409 ) -> std::result::Result<Response<DownloadResponse>, Status> {
410 let RequestData = request.into_inner();
411 let request_id = RequestData.request_id.clone();
412
413 info!(
414 "[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
415 request_id, RequestData.url
416 );
417
418 let download_request_id = if request_id.is_empty() {
420 crate::Utility::GenerateRequestId()
421 } else {
422 request_id.clone()
423 };
424
425 self.AppState
426 .RegisterRequest(download_request_id.clone(), "downloader".to_string())
427 .await
428 .map_err(|e| Status::internal(e.to_string()))?;
429
430 if RequestData.url.is_empty() {
432 let error_msg = "URL cannot be empty".to_string();
433 self.AppState
434 .UpdateRequestStatus(
435 &download_request_id,
436 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
437 None,
438 )
439 .await
440 .ok();
441 return Ok(Response::new(DownloadResponse {
442 request_id:download_request_id,
443 success:false,
444 file_path:String::new(),
445 file_size:0,
446 checksum:String::new(),
447 error:error_msg,
448 }));
449 }
450
451 if !match_url_scheme(&RequestData.url) {
453 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
454 self.AppState
455 .UpdateRequestStatus(
456 &download_request_id,
457 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
458 None,
459 )
460 .await
461 .ok();
462 return Ok(Response::new(DownloadResponse {
463 request_id:download_request_id,
464 success:false,
465 file_path:String::new(),
466 file_size:0,
467 checksum:String::new(),
468 error:error_msg,
469 }));
470 }
471
472 let DestinationPath = if RequestData.destination_path.is_empty() {
474 let config = &self.AppState.Configuration.Downloader;
476 config.CacheDirectory.clone()
477 } else {
478 RequestData.destination_path.clone()
479 };
480
481 let dest_path = std::path::Path::new(&DestinationPath);
483 if let Some(parent) = dest_path.parent() {
484 if !parent.exists() {
485 match tokio::fs::create_dir_all(parent).await {
486 Ok(_) => {
487 debug!("[AirVinegRPCService] Created destination directory: {}", parent.display());
488 },
489 Err(e) => {
490 let error_msg = format!("Failed to create destination directory: {}", e);
491 self.AppState
492 .UpdateRequestStatus(
493 &download_request_id,
494 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
495 None,
496 )
497 .await
498 .ok();
499 return Ok(Response::new(DownloadResponse {
500 request_id:download_request_id,
501 success:false,
502 file_path:String::new(),
503 file_size:0,
504 checksum:String::new(),
505 error:error_msg,
506 }));
507 },
508 }
509 }
510 }
511
512 let _download_manager = self.DownloadManager.clone();
514 let AppState = self.AppState.clone();
515 let callback_request_id = download_request_id.clone();
516 let progress_callback = move |progress:f32| {
517 let state = AppState.clone();
518 let id = callback_request_id.clone();
519 tokio::spawn(async move {
520 let _ = state
521 .UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
522 .await;
523 });
524 };
525
526 let result = self
528 .download_file_with_retry(
529 &download_request_id,
530 RequestData.url.clone(),
531 DestinationPath,
532 RequestData.checksum,
533 Some(Box::new(progress_callback)),
534 )
535 .await;
536
537 match result {
538 Ok(file_info) => {
539 self.AppState
540 .UpdateRequestStatus(
541 &download_request_id,
542 crate::ApplicationState::RequestState::Completed,
543 Some(100.0),
544 )
545 .await
546 .ok();
547
548 info!(
549 "[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
550 download_request_id, file_info.size
551 );
552
553 Ok(Response::new(DownloadResponse {
554 request_id:download_request_id,
555 success:true,
556 file_path:file_info.path,
557 file_size:file_info.size,
558 checksum:file_info.checksum,
559 error:String::new(),
560 }))
561 },
562 Err(e) => {
563 self.AppState
564 .UpdateRequestStatus(
565 &download_request_id,
566 crate::ApplicationState::RequestState::Failed(e.to_string()),
567 None,
568 )
569 .await
570 .ok();
571
572 error!(
573 "[AirVinegRPCService] Download failed [ID: {}] - Error: {}",
574 download_request_id, e
575 );
576
577 Ok(Response::new(DownloadResponse {
578 request_id:download_request_id,
579 success:false,
580 file_path:String::new(),
581 file_size:0,
582 checksum:String::new(),
583 error:e.to_string(),
584 }))
585 },
586 }
587 }
588
589 async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
591 let RequestData = request.into_inner();
592 let request_id = RequestData.request_id;
593
594 info!(
595 "[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
596 request_id, RequestData.path
597 );
598
599 self.AppState
600 .RegisterRequest(request_id.clone(), "indexing".to_string())
601 .await
602 .map_err(|e| Status::internal(e.to_string()))?;
603
604 let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
605
606 match result {
607 Ok(index_info) => {
608 self.AppState
609 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
610 .await
611 .ok();
612
613 Ok(Response::new(air_generated::IndexResponse {
614 request_id,
615 success:true,
616 files_indexed:index_info.files_indexed,
617 total_size:index_info.total_size,
618 error:String::new(),
619 }))
620 },
621 Err(e) => {
622 self.AppState
623 .UpdateRequestStatus(
624 &request_id,
625 crate::ApplicationState::RequestState::Failed(e.to_string()),
626 None,
627 )
628 .await
629 .ok();
630
631 Ok(Response::new(air_generated::IndexResponse {
632 request_id,
633 success:false,
634 files_indexed:0,
635 total_size:0,
636 error:e.to_string(),
637 }))
638 },
639 }
640 }
641
642 async fn get_status(
644 &self,
645 request:Request<StatusRequest>,
646 ) -> std::result::Result<Response<StatusResponse>, Status> {
647 let _RequestData = request.into_inner();
648
649 debug!("[AirVinegRPCService] Status request received");
650
651 let metrics = self.AppState.GetMetrics().await;
652 let resources = self.AppState.GetResourceUsage().await;
653
654 Ok(Response::new(air_generated::StatusResponse {
655 version:crate::VERSION.to_string(),
656 uptime_seconds:metrics.UptimeSeconds,
657 total_requests:metrics.TotalRequests,
658 successful_requests:metrics.SuccessfulRequests,
659 failed_requests:metrics.FailedRequests,
660 average_response_time:metrics.AverageResponseTime,
661 memory_usage_mb:resources.MemoryUsageMb,
662 cpu_usage_percent:resources.CPUUsagePercent,
663 active_requests:self.AppState.GetActiveRequestCount().await as u32,
664 }))
665 }
666
667 async fn health_check(
669 &self,
670 _request:Request<HealthCheckRequest>,
671 ) -> std::result::Result<Response<HealthCheckResponse>, Status> {
672 debug!("[AirVinegRPCService] Health check request received");
673
674 Ok(Response::new(air_generated::HealthCheckResponse {
675 healthy:true,
676 timestamp:CurrentTimestamp(),
677 }))
678 }
679
680 async fn download_update(
684 &self,
685 request:Request<DownloadRequest>,
686 ) -> std::result::Result<Response<DownloadResponse>, Status> {
687 let RequestData = request.into_inner();
688 let request_id = RequestData.request_id.clone();
689
690 info!(
691 "[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
692 request_id, RequestData.url, RequestData.destination_path
693 );
694
695 self.AppState
696 .RegisterRequest(request_id.clone(), "download_update".to_string())
697 .await
698 .map_err(|e| Status::internal(e.to_string()))?;
699
700 if RequestData.url.is_empty() {
702 let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
703 self.AppState
704 .UpdateRequestStatus(
705 &request_id,
706 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
707 None,
708 )
709 .await
710 .ok();
711 return Err(Status::invalid_argument(error_msg.to_string()));
712 }
713
714 if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
716 let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
717 self.AppState
718 .UpdateRequestStatus(
719 &request_id,
720 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
721 None,
722 )
723 .await
724 .ok();
725 return Err(Status::invalid_argument(error_msg.to_string()));
726 }
727
728 let destination = if RequestData.destination_path.is_empty() {
730 self.UpdateManager
732 .GetCacheDirectory()
733 .join("update-latest.bin")
734 .to_string_lossy()
735 .to_string()
736 } else {
737 let dest_path = std::path::Path::new(&RequestData.destination_path);
739 if let Some(parent) = dest_path.parent() {
740 if parent.as_os_str().is_empty() {
741 self.UpdateManager
743 .GetCacheDirectory()
744 .join(&RequestData.destination_path)
745 .to_string_lossy()
746 .to_string()
747 } else {
748 RequestData.destination_path.clone()
750 }
751 } else {
752 RequestData.destination_path.clone()
753 }
754 };
755
756 let dest_path = std::path::Path::new(&destination);
758 if let Some(parent) = dest_path.parent() {
759 if !parent.exists() {
760 return Err(Status::failed_precondition(format!(
761 "Destination directory does not exist: {}",
762 parent.display()
763 )));
764 }
765
766 if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
768 let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
769 self.AppState
770 .UpdateRequestStatus(
771 &request_id,
772 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
773 None,
774 )
775 .await
776 .ok();
777 return Err(Status::permission_denied(error_msg.to_string()));
778 }
779 let _ = std::fs::remove_file(parent.join(".write_test"));
781 }
782
783 let download_result = self
786 .DownloadManager
787 .DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
788 .await;
789
790 match download_result {
791 Ok(result) => {
792 self.AppState
793 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
794 .await
795 .ok();
796
797 info!(
798 "[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
799 result.path, result.size, result.checksum
800 );
801
802 Ok(Response::new(DownloadResponse {
803 request_id,
804 success:true,
805 file_path:result.path,
806 file_size:result.size,
807 checksum:result.checksum,
808 error:String::new(),
809 }))
810 },
811 Err(crate::AirError::Network(e)) => {
812 self.AppState
813 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
814 .await
815 .ok();
816 error!("[AirVinegRPCService] Download update network error: {}", e);
817 Err(Status::unavailable(e))
818 },
819 Err(crate::AirError::FileSystem(e)) => {
820 self.AppState
821 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
822 .await
823 .ok();
824 error!("[AirVinegRPCService] Download update filesystem error: {}", e);
825 Err(Status::internal(e))
826 },
827 Err(e) => {
828 self.AppState
829 .UpdateRequestStatus(
830 &request_id,
831 crate::ApplicationState::RequestState::Failed(e.to_string()),
832 None,
833 )
834 .await
835 .ok();
836 error!("[AirVinegRPCService] Download update failed: {}", e);
837 Ok(Response::new(DownloadResponse {
838 request_id,
839 success:false,
840 file_path:String::new(),
841 file_size:0,
842 checksum:String::new(),
843 error:e.to_string(),
844 }))
845 },
846 }
847 }
848
849 async fn apply_update(
851 &self,
852 request:Request<ApplyUpdateRequest>,
853 ) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
854 let RequestData = request.into_inner();
855 let request_id = RequestData.request_id.clone();
856
857 info!(
858 "[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
859 request_id, RequestData.version, RequestData.update_path
860 );
861
862 self.AppState
863 .RegisterRequest(request_id.clone(), "apply_update".to_string())
864 .await
865 .map_err(|e| Status::internal(e.to_string()))?;
866
867 if RequestData.version.is_empty() {
869 let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
870 self.AppState
871 .UpdateRequestStatus(
872 &request_id,
873 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
874 None,
875 )
876 .await
877 .ok();
878 return Err(Status::invalid_argument(error_msg.to_string()));
879 }
880
881 if RequestData.update_path.is_empty() {
883 let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
884 self.AppState
885 .UpdateRequestStatus(
886 &request_id,
887 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
888 None,
889 )
890 .await
891 .ok();
892 return Err(Status::invalid_argument(error_msg.to_string()));
893 }
894
895 let update_path = std::path::Path::new(&RequestData.update_path);
896
897 if !update_path.exists() {
899 let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
900 self.AppState
901 .UpdateRequestStatus(
902 &request_id,
903 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
904 None,
905 )
906 .await
907 .ok();
908 return Err(Status::not_found(error_msg.to_string()));
909 }
910
911 let metadata = match std::fs::metadata(update_path) {
913 Ok(m) => m,
914 Err(e) => {
915 let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
916 self.AppState
917 .UpdateRequestStatus(
918 &request_id,
919 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
920 None,
921 )
922 .await
923 .ok();
924 return Err(Status::internal(error_msg.to_string()));
925 },
926 };
927
928 if metadata.len() == 0 {
929 let error_msg = crate::AirError::Validation("Update file is empty".to_string());
930 self.AppState
931 .UpdateRequestStatus(
932 &request_id,
933 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
934 None,
935 )
936 .await
937 .ok();
938 return Err(Status::failed_precondition(error_msg.to_string()));
939 }
940
941 let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
943 if let Err(ref e) = rollback_backup_path {
944 warn!(
945 "[AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback capability.",
946 e
947 );
948 }
949
950 match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
952 Ok(true) => {
953 info!("[AirVinegRPCService] Update verification successful, preparing for installation");
954
955 self.AppState
956 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
957 .await
958 .ok();
959
960 let AppState = self.AppState.clone();
962 let version = RequestData.version.clone();
963 let self_clone = self.clone();
964
965 tokio::spawn(async move {
966 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
967 log::info!(
968 "[AirVinegRPCService] Initiating graceful shutdown for update version {}",
969 version
970 );
971
972 if let Err(e) = AppState.StopAllBackgroundTasks().await {
974 log::error!("[AirVinegRPCService] Failed to initiate graceful shutdown: {}", e);
975
976 log::warn!("[AirVinegRPCService] Rollback initiated due to graceful shutdown failure");
978 if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
979 log::error!("[AirVinegRPCService] Rollback failed: {}", rollback_error);
980 } else {
981 log::info!("[AirVinegRPCService] Rollback completed successfully");
982 }
983 }
984 });
985
986 Ok(Response::new(ApplyUpdateResponse {
987 request_id,
988 success:true,
989 error:String::new(),
990 }))
991 },
992 Ok(false) => {
993 let error_msg = "Update verification failed: checksum mismatch".to_string();
994 self.AppState
995 .UpdateRequestStatus(
996 &request_id,
997 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
998 None,
999 )
1000 .await
1001 .ok();
1002 error!("[AirVinegRPCService] {}", error_msg);
1003
1004 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1006
1007 Err(Status::failed_precondition(error_msg))
1008 },
1009 Err(crate::AirError::FileSystem(e)) => {
1010 self.AppState
1011 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1012 .await
1013 .ok();
1014 error!("[AirVinegRPCService] Update verification filesystem error: {}", e);
1015
1016 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1018
1019 Err(Status::internal(e))
1020 },
1021 Err(e) => {
1022 self.AppState
1023 .UpdateRequestStatus(
1024 &request_id,
1025 crate::ApplicationState::RequestState::Failed(e.to_string()),
1026 None,
1027 )
1028 .await
1029 .ok();
1030 error!("[AirVinegRPCService] Update verification error: {}", e);
1031
1032 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1034
1035 Ok(Response::new(ApplyUpdateResponse {
1036 request_id,
1037 success:false,
1038 error:e.to_string(),
1039 }))
1040 },
1041 }
1042 }
1043
1044 type DownloadStreamStream =
1049 tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1050
1051 async fn download_stream(
1052 &self,
1053 request:Request<DownloadStreamRequest>,
1054 ) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1055 let RequestData = request.into_inner();
1056 let request_id = RequestData.request_id.clone();
1057
1058 info!(
1059 "[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1060 request_id, RequestData.url
1061 );
1062
1063 self.AppState
1064 .RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1065 .await
1066 .map_err(|e| Status::internal(e.to_string()))?;
1067
1068 if RequestData.url.is_empty() {
1070 let error_msg = "URL cannot be empty".to_string();
1071 self.AppState
1072 .UpdateRequestStatus(
1073 &request_id,
1074 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1075 None,
1076 )
1077 .await
1078 .ok();
1079 return Err(Status::invalid_argument(error_msg));
1080 }
1081
1082 if !match_url_scheme(&RequestData.url) {
1084 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1085 self.AppState
1086 .UpdateRequestStatus(
1087 &request_id,
1088 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1089 None,
1090 )
1091 .await
1092 .ok();
1093 return Err(Status::invalid_argument(error_msg));
1094 }
1095
1096 match self.validate_range_support(&RequestData.url).await {
1098 Ok(true) => {
1099 debug!("[AirVinegRPCService] URL supports range headers");
1100 },
1101 Ok(false) => {
1102 warn!("[AirVinegRPCService] URL does not support range headers, streaming may be inefficient");
1103 },
1104 Err(e) => {
1105 let error_msg = format!("Failed to validate range support: {}", e);
1106 self.AppState
1107 .UpdateRequestStatus(
1108 &request_id,
1109 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1110 None,
1111 )
1112 .await
1113 .ok();
1114 return Err(Status::internal(error_msg));
1115 },
1116 }
1117
1118 let (tx, rx) = tokio::sync::mpsc::channel(100);
1120
1121 let chunk_size = 8 * 1024 * 1024; let url = RequestData.url.clone();
1126 let headers = RequestData.headers;
1127 let download_request_id = request_id.clone();
1128 let _download_manager = self.DownloadManager.clone();
1129 let AppState = self.AppState.clone();
1130
1131 tokio::spawn(async move {
1133 if tx
1135 .send(Ok(DownloadStreamResponse {
1136 request_id:download_request_id.clone(),
1137 chunk:vec![].into(),
1138 total_size:0,
1139 downloaded:0,
1140 completed:false,
1141 error:String::new(),
1142 }))
1143 .await
1144 .is_err()
1145 {
1146 log::warn!(
1147 "[AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1148 download_request_id
1149 );
1150 return;
1151 }
1152
1153 let client = reqwest::Client::builder()
1155 .pool_idle_timeout(std::time::Duration::from_secs(60))
1156 .pool_max_idle_per_host(5)
1157 .timeout(std::time::Duration::from_secs(300))
1158 .build();
1159
1160 if client.is_err() {
1161 let error = client.unwrap_err().to_string();
1162 let _ = tx
1163 .send(Ok(DownloadStreamResponse {
1164 request_id:download_request_id.clone(),
1165 chunk:vec![].into(),
1166 total_size:0,
1167 downloaded:0,
1168 completed:false,
1169 error:error.clone(),
1170 }))
1171 .await;
1172 AppState
1173 .UpdateRequestStatus(
1174 &download_request_id,
1175 crate::ApplicationState::RequestState::Failed(error),
1176 None,
1177 )
1178 .await
1179 .ok();
1180 return;
1181 }
1182
1183 let client = match client {
1184 Ok(client) => client,
1185 Err(e) => {
1186 let error = format!("Failed to create HTTP client: {}", e);
1187 let _ = tx.send(Err(Status::internal(error.clone())));
1188 AppState
1189 .UpdateRequestStatus(
1190 &download_request_id,
1191 crate::ApplicationState::RequestState::Failed(error),
1192 None,
1193 )
1194 .await
1195 .ok();
1196 return;
1197 },
1198 };
1199
1200 let mut total_size:Option<u64> = None;
1202 let mut total_downloaded:u64 = 0;
1203
1204 match client
1205 .get(&url)
1206 .headers({
1207 let mut map = reqwest::header::HeaderMap::new();
1208 for (key, value) in headers {
1209 if let (Ok(header_name), Ok(header_value)) = (
1210 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1211 reqwest::header::HeaderValue::from_str(&value),
1212 ) {
1213 map.insert(header_name, header_value);
1214 }
1215 }
1216 map
1217 })
1218 .send()
1219 .await
1220 {
1221 Ok(response) => {
1222 if !response.status().is_success() {
1223 let error = format!("Download failed with status: {}", response.status());
1224 let _ = tx
1225 .send(Ok(DownloadStreamResponse {
1226 request_id:download_request_id.clone(),
1227 chunk:vec![].into(),
1228 total_size:0,
1229 downloaded:0,
1230 completed:false,
1231 error:error.clone(),
1232 }))
1233 .await;
1234 AppState
1235 .UpdateRequestStatus(
1236 &download_request_id,
1237 crate::ApplicationState::RequestState::Failed(error),
1238 None,
1239 )
1240 .await
1241 .ok();
1242 return;
1243 }
1244
1245 total_size = Some(response.content_length().unwrap_or(0));
1246 let response_tx = tx.clone();
1247 let response_id = download_request_id.clone();
1248
1249 let mut stream = response.bytes_stream();
1251 let mut buffer = Vec::with_capacity(chunk_size);
1252 let mut last_progress:f32 = 0.0;
1253
1254 while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1255 if AppState.IsRequestCancelled(&download_request_id).await {
1256 log::info!(
1257 "[AirVinegRPCService] Download cancelled by client [ID: {}]",
1258 download_request_id
1259 );
1260 AppState
1261 .UpdateRequestStatus(
1262 &download_request_id,
1263 crate::ApplicationState::RequestState::Cancelled,
1264 None,
1265 )
1266 .await
1267 .ok();
1268 return;
1269 }
1270
1271 match chunk_result {
1272 Ok(chunk) => {
1273 buffer.extend_from_slice(&chunk);
1274 total_downloaded += chunk.len() as u64;
1275
1276 if buffer.len() >= chunk_size {
1278 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1280
1281 let progress = if let Some(ts) = total_size {
1283 if ts > 0 {
1284 (total_downloaded as f32 / ts as f32) * 100.0
1285 } else {
1286 0.0
1287 }
1288 } else {
1289 0.0
1290 };
1291
1292 if progress - last_progress >= 5.0 {
1294 AppState
1295 .UpdateRequestStatus(
1296 &download_request_id,
1297 crate::ApplicationState::RequestState::InProgress,
1298 Some(progress),
1299 )
1300 .await
1301 .ok();
1302 last_progress = progress;
1303 }
1304
1305 if response_tx
1306 .send(Ok(DownloadStreamResponse {
1307 request_id:response_id.clone(),
1308 chunk:buffer.clone().into(),
1309 total_size: total_size.unwrap_or(0),
1310 downloaded:total_downloaded,
1311 completed:false,
1312 error:String::new(),
1313 }))
1314 .await
1315 .is_err()
1316 {
1317 log::warn!(
1318 "[AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1319 download_request_id
1320 );
1321 AppState
1322 .UpdateRequestStatus(
1323 &download_request_id,
1324 crate::ApplicationState::RequestState::Failed(
1325 "Client disconnected".to_string(),
1326 ),
1327 None,
1328 )
1329 .await
1330 .ok();
1331 return;
1332 }
1333
1334 debug!(
1335 "[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1336 buffer.len(),
1337 download_request_id,
1338 progress
1339 );
1340
1341 buffer.clear();
1342 }
1343 },
1344 Err(e) => {
1345 let error = format!("Download error: {}", e);
1346 log::error!(
1347 "[AirVinegRPCService] Stream download failed [ID: {}]: {}",
1348 download_request_id,
1349 error
1350 );
1351
1352 let _ = response_tx
1353 .send(Ok(DownloadStreamResponse {
1354 request_id:response_id.clone(),
1355 chunk:vec![].into(),
1356 total_size: total_size.unwrap_or(0),
1357 downloaded:total_downloaded,
1358 completed:false,
1359 error:error.clone(),
1360 }))
1361 .await;
1362
1363 AppState
1364 .UpdateRequestStatus(
1365 &download_request_id,
1366 crate::ApplicationState::RequestState::Failed(error),
1367 None,
1368 )
1369 .await
1370 .ok();
1371 return;
1372 },
1373 }
1374 }
1375
1376 if !buffer.is_empty() {
1378 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1379
1380 if tx
1381 .send(Ok(DownloadStreamResponse {
1382 request_id:download_request_id.clone(),
1383 chunk:buffer.into(),
1384 total_size: total_size.unwrap_or(0),
1385 downloaded:total_downloaded,
1386 completed:false,
1387 error:String::new(),
1388 }))
1389 .await
1390 .is_err()
1391 {
1392 log::warn!(
1393 "[AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1394 download_request_id
1395 );
1396 return;
1397 }
1398 }
1399
1400 AppState
1402 .UpdateRequestStatus(
1403 &download_request_id,
1404 crate::ApplicationState::RequestState::Completed,
1405 Some(100.0),
1406 )
1407 .await
1408 .ok();
1409
1410 let _ = tx
1411 .send(Ok(DownloadStreamResponse {
1412 request_id,
1413 chunk:vec![].into(),
1414 total_size: total_size.unwrap_or(0),
1415 downloaded:total_downloaded,
1416 completed:true,
1417 error:String::new(),
1418 }))
1419 .await;
1420
1421 info!(
1422 "[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1423 download_request_id, total_downloaded
1424 );
1425 },
1426 Err(e) => {
1427 let error = format!("Failed to start streaming download: {}", e);
1428 log::error!(
1429 "[AirVinegRPCService] Stream download error [ID: {}]: {}",
1430 download_request_id,
1431 error
1432 );
1433
1434 let _ = tx
1435 .send(Ok(DownloadStreamResponse {
1436 request_id:download_request_id.clone(),
1437 chunk:vec![].into(),
1438 total_size:0,
1439 downloaded:0,
1440 completed:false,
1441 error:error.clone(),
1442 }))
1443 .await;
1444
1445 AppState
1446 .UpdateRequestStatus(
1447 &download_request_id,
1448 crate::ApplicationState::RequestState::Failed(error),
1449 None,
1450 )
1451 .await
1452 .ok();
1453 },
1454 }
1455 });
1456
1457 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1458 }
1459
1460 async fn search_files(
1464 &self,
1465 request:Request<SearchRequest>,
1466 ) -> std::result::Result<Response<SearchResponse>, Status> {
1467 let RequestData = request.into_inner();
1468 let request_id = RequestData.request_id.clone();
1469
1470 debug!(
1471 "[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1472 RequestData.query, RequestData.path
1473 );
1474
1475 if RequestData.query.is_empty() {
1477 return Ok(Response::new(SearchResponse {
1478 request_id,
1479 results:vec![],
1480 total_results:0,
1481 error:"Search query cannot be empty".to_string(),
1482 }));
1483 }
1484
1485 let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1487 let _search_path = path.as_deref();
1488
1489 match self
1490 .FileIndexer
1491 .SearchFiles(
1492 SearchQuery {
1493 query:RequestData.query.clone(),
1494 mode:SearchMode::Literal,
1495 case_sensitive:false,
1496 whole_word:false,
1497 regex:None,
1498 max_results:RequestData.max_results,
1499 page:1,
1500 },
1501 path,
1502 None,
1503 )
1504 .await
1505 {
1506 Ok(search_results) => {
1507 let mut file_results = Vec::new();
1509 for r in search_results {
1510 let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1512 (first_match.line_content.clone(), first_match.line_number)
1513 } else {
1514 (String::new(), 0)
1515 };
1516
1517 let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1519 metadata.size
1520 } else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1521 file_metadata.len()
1522 } else {
1523 0
1524 };
1525
1526 file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1527 }
1528
1529 info!("[AirVinegRPCService] Search completed: {} results found", file_results.len());
1530
1531 let result_count = file_results.len();
1532 Ok(Response::new(SearchResponse {
1533 request_id,
1534 results:file_results,
1535 total_results:result_count as u32,
1536 error:String::new(),
1537 }))
1538 },
1539 Err(e) => {
1540 error!("[AirVinegRPCService] Search failed: {}", e);
1541 Ok(Response::new(SearchResponse {
1542 request_id,
1543 results:vec![],
1544 total_results:0,
1545 error:e.to_string(),
1546 }))
1547 },
1548 }
1549 }
1550
1551 async fn get_file_info(
1553 &self,
1554 request:Request<FileInfoRequest>,
1555 ) -> std::result::Result<Response<FileInfoResponse>, Status> {
1556 let RequestData = request.into_inner();
1557 let request_id = RequestData.request_id.clone();
1558
1559 debug!("[AirVinegRPCService] Get file info request: {}", RequestData.path);
1560
1561 if RequestData.path.is_empty() {
1563 return Ok(Response::new(FileInfoResponse {
1564 request_id,
1565 exists:false,
1566 size:0,
1567 mime_type:String::new(),
1568 checksum:String::new(),
1569 modified_time:0,
1570 error:"Path cannot be empty".to_string(),
1571 }));
1572 }
1573
1574 use std::path::Path;
1576 let path = Path::new(&RequestData.path);
1577
1578 if !path.exists() {
1579 return Ok(Response::new(FileInfoResponse {
1580 request_id,
1581 exists:false,
1582 size:0,
1583 mime_type:String::new(),
1584 checksum:String::new(),
1585 modified_time:0,
1586 error:String::new(), }));
1588 }
1589
1590 match std::fs::metadata(path) {
1592 Ok(metadata) => {
1593 let modified_time = metadata
1594 .modified()
1595 .ok()
1596 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1597 .map(|d| d.as_secs())
1598 .unwrap_or(0);
1599
1600 let mime_type = self.detect_mime_type(path);
1602
1603 let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1605 log::warn!("[AirVinegRPCService] Failed to calculate checksum: {}", e);
1606 String::new()
1607 });
1608
1609 Ok(Response::new(FileInfoResponse {
1610 request_id,
1611 exists:true,
1612 size:metadata.len(),
1613 mime_type,
1614 checksum,
1615 modified_time,
1616 error:String::new(),
1617 }))
1618 },
1619 Err(e) => {
1620 error!("[AirVinegRPCService] Failed to get file metadata: {}", e);
1621 Ok(Response::new(FileInfoResponse {
1622 request_id,
1623 exists:false,
1624 size:0,
1625 mime_type:String::new(),
1626 checksum:String::new(),
1627 modified_time:0,
1628 error:e.to_string(),
1629 }))
1630 },
1631 }
1632 }
1633
1634 async fn get_metrics(
1638 &self,
1639 request:Request<MetricsRequest>,
1640 ) -> std::result::Result<Response<MetricsResponse>, Status> {
1641 let RequestData = request.into_inner();
1642 let request_id = RequestData.request_id.clone();
1643
1644 debug!("[AirVinegRPCService] Get metrics request: type='{}'", RequestData.metric_type);
1645
1646 let metrics = self.AppState.GetMetrics().await;
1647 let mut metrics_map = std::collections::HashMap::new();
1648
1649 if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1651 metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1652 metrics_map.insert("total_requests".to_string(), metrics.TotalRequests.to_string());
1653 metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequests.to_string());
1654 metrics_map.insert("failed_requests".to_string(), metrics.FailedRequests.to_string());
1655 metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1656 }
1657
1658 if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1660 metrics_map.insert(
1661 "ActiveRequests".to_string(),
1662 self.AppState.GetActiveRequestCount().await.to_string(),
1663 );
1664 }
1665
1666 Ok(Response::new(MetricsResponse {
1667 request_id,
1668 metrics:metrics_map,
1669 error:String::new(),
1670 }))
1671 }
1672
1673 async fn get_resource_usage(
1675 &self,
1676 request:Request<ResourceUsageRequest>,
1677 ) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1678 let RequestData = request.into_inner();
1679 let request_id = RequestData.request_id.clone();
1680
1681 debug!("[AirVinegRPCService] Get resource usage request");
1682
1683 let resources = self.AppState.GetResourceUsage().await;
1684
1685 Ok(Response::new(ResourceUsageResponse {
1686 request_id,
1687 memory_usage_mb:resources.MemoryUsageMb,
1688 cpu_usage_percent:resources.CPUUsagePercent,
1689 disk_usage_mb:resources.DiskUsageMb,
1690 network_usage_mbps:resources.NetworkUsageMbps,
1691 error:String::new(),
1692 }))
1693 }
1694
1695 async fn set_resource_limits(
1697 &self,
1698 request:Request<ResourceLimitsRequest>,
1699 ) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1700 let RequestData = request.into_inner();
1701 let request_id = RequestData.request_id.clone();
1702
1703 info!(
1704 "[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1705 RequestData.memory_limit_mb, RequestData.cpu_limit_percent, RequestData.disk_limit_mb
1706 );
1707
1708 if RequestData.memory_limit_mb == 0 {
1710 return Ok(Response::new(ResourceLimitsResponse {
1711 request_id,
1712 success:false,
1713 error:"Memory limit must be greater than 0".to_string(),
1714 }));
1715 }
1716
1717 if RequestData.cpu_limit_percent > 100 {
1718 return Ok(Response::new(ResourceLimitsResponse {
1719 request_id,
1720 success:false,
1721 error:"CPU limit cannot exceed 100%".to_string(),
1722 }));
1723 }
1724
1725 let result = self
1727 .AppState
1728 .SetResourceLimits(
1729 Some(RequestData.memory_limit_mb as u64),
1730 Some(RequestData.cpu_limit_percent as f64),
1731 Some(RequestData.disk_limit_mb as u64),
1732 )
1733 .await;
1734
1735 match result {
1736 Ok(_) => {
1737 Ok(Response::new(ResourceLimitsResponse {
1738 request_id,
1739 success:true,
1740 error:String::new(),
1741 }))
1742 },
1743 Err(e) => {
1744 Ok(Response::new(ResourceLimitsResponse {
1745 request_id,
1746 success:false,
1747 error:e.to_string(),
1748 }))
1749 },
1750 }
1751 }
1752
1753 async fn get_configuration(
1757 &self,
1758 request:Request<ConfigurationRequest>,
1759 ) -> std::result::Result<Response<ConfigurationResponse>, Status> {
1760 let RequestData = request.into_inner();
1761 let request_id = RequestData.request_id.clone();
1762
1763 debug!(
1764 "[AirVinegRPCService] Get configuration request: section='{}'",
1765 RequestData.section
1766 );
1767
1768 let config = self.AppState.GetConfiguration().await;
1770 let mut config_map = std::collections::HashMap::new();
1771
1772 match RequestData.section.as_str() {
1774 "grpc" => {
1775 config_map.insert("bind_address".to_string(), config.gRPC.BindAddress.clone());
1776 config_map.insert("max_connections".to_string(), config.gRPC.MaxConnections.to_string());
1777 config_map.insert("request_timeout_secs".to_string(), config.gRPC.RequestTimeoutSecs.to_string());
1778 },
1779 "authentication" => {
1780 config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
1781 config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
1782 config_map.insert(
1783 "token_expiration_hours".to_string(),
1784 config.Authentication.TokenExpirationHours.to_string(),
1785 );
1786 },
1787 "updates" => {
1788 config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
1789 config_map.insert(
1790 "check_interval_hours".to_string(),
1791 config.Updates.CheckIntervalHours.to_string(),
1792 );
1793 config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
1794 config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
1795 config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
1796 },
1797 "downloader" => {
1798 config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
1799 config_map.insert(
1800 "max_concurrent_downloads".to_string(),
1801 config.Downloader.MaxConcurrentDownloads.to_string(),
1802 );
1803 config_map.insert(
1804 "download_timeout_secs".to_string(),
1805 config.Downloader.DownloadTimeoutSecs.to_string(),
1806 );
1807 config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
1808 config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
1809 },
1810 "indexing" => {
1811 config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
1812 config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
1813 config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
1814 config_map.insert(
1815 "update_interval_minutes".to_string(),
1816 config.Indexing.UpdateIntervalMinutes.to_string(),
1817 );
1818 config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
1819 },
1820 _ => {
1821 config_map.insert("_grpc_enabled".to_string(), "true".to_string());
1823 },
1824 }
1825
1826 Ok(Response::new(ConfigurationResponse {
1827 request_id,
1828 configuration:config_map,
1829 error:String::new(),
1830 }))
1831 }
1832
1833 async fn update_configuration(
1835 &self,
1836 request:Request<UpdateConfigurationRequest>,
1837 ) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
1838 let RequestData = request.into_inner();
1839 let request_id = RequestData.request_id.clone();
1840
1841 info!(
1842 "[AirVinegRPCService] Update configuration request: section='{}'",
1843 RequestData.section
1844 );
1845
1846 if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
1848 {
1849 return Ok(Response::new(UpdateConfigurationResponse {
1850 request_id,
1851 success:false,
1852 error:"Invalid configuration section".to_string(),
1853 }));
1854 }
1855
1856 let result = self
1858 .AppState
1859 .UpdateConfiguration(RequestData.section, RequestData.updates)
1860 .await;
1861
1862 match result {
1863 Ok(_) => {
1864 Ok(Response::new(UpdateConfigurationResponse {
1865 request_id,
1866 success:true,
1867 error:String::new(),
1868 }))
1869 },
1870 Err(e) => {
1871 Ok(Response::new(UpdateConfigurationResponse {
1872 request_id,
1873 success:false,
1874 error:e.to_string(),
1875 }))
1876 },
1877 }
1878 }
1879}
1880
1881impl AirVinegRPCService {
1884 fn detect_mime_type(&self, path:&std::path::Path) -> String {
1886 match path.extension().and_then(|e| e.to_str()) {
1887 Some("rs") => "text/x-rust".to_string(),
1888 Some("ts") => "application/typescript".to_string(),
1889 Some("js") => "application/javascript".to_string(),
1890 Some("json") => "application/json".to_string(),
1891 Some("toml") => "application/toml".to_string(),
1892 Some("md") => "text/markdown".to_string(),
1893 Some("txt") => "text/plain".to_string(),
1894 Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
1895 Some("html") => "text/html".to_string(),
1896 Some("css") => "text/css".to_string(),
1897 Some("xml") => "application/xml".to_string(),
1898 Some("png") => "image/png".to_string(),
1899 Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
1900 Some("gif") => "image/gif".to_string(),
1901 Some("svg") => "image/svg+xml".to_string(),
1902 Some("pdf") => "application/pdf".to_string(),
1903 Some("zip") => "application/zip".to_string(),
1904 Some("tar") | Some("gz") => "application/x-tar".to_string(),
1905 Some("proto") => "application/x-protobuf".to_string(),
1906 _ => "application/octet-stream".to_string(),
1907 }
1908 }
1909
1910 async fn download_file_with_retry(
1913 &self,
1914 request_id:&str,
1915 url:String,
1916 DestinationPath:String,
1917 checksum:String,
1918 progress_callback:Option<Box<dyn Fn(f32) + Send>>,
1919 ) -> Result<crate::Downloader::DownloadResult> {
1920 let config = &self.AppState.Configuration.Downloader;
1921 let mut retries = 0;
1922
1923 loop {
1924 match self
1925 .DownloadManager
1926 .DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
1927 .await
1928 {
1929 Ok(file_info) => {
1930 if let Some(ref callback) = progress_callback {
1931 callback(100.0);
1932 }
1933 return Ok(file_info);
1934 },
1935 Err(e) => {
1936 if retries < config.MaxRetries as usize {
1937 retries += 1;
1938 let backoff_secs = 2u64.pow(retries as u32);
1939 warn!(
1940 "[AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - Backing \
1941 off {} seconds",
1942 request_id, retries, config.MaxRetries, e, backoff_secs
1943 );
1944
1945 if let Some(ref callback) = progress_callback {
1946 let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
1948 callback(progress);
1949 }
1950
1951 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
1952 } else {
1953 error!(
1954 "[AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
1955 config.MaxRetries, request_id, e
1956 );
1957 return Err(e);
1958 }
1959 },
1960 }
1961 }
1962 }
1963
1964 async fn validate_range_support(&self, url:&str) -> Result<bool> {
1966 let client = reqwest::Client::builder()
1967 .timeout(std::time::Duration::from_secs(10))
1968 .build()
1969 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
1970
1971 let response = client
1972 .head(url)
1973 .send()
1974 .await
1975 .map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
1976
1977 let accepts_ranges = response
1979 .headers()
1980 .get("accept-ranges")
1981 .map(|v| v.to_str().unwrap_or("none"))
1982 .unwrap_or("none");
1983
1984 Ok(accepts_ranges == "bytes")
1985 }
1986
1987 async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
1989 let cache_dir = self.UpdateManager.GetCacheDirectory();
1990 let rollback_dir = cache_dir.join("rollback");
1991
1992 if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
1994 return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
1995 }
1996
1997 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
1999 let marker_content = format!(
2000 "version={}\ntimestamp={}\nrollback_available=true",
2001 version,
2002 chrono::Utc::now().to_rfc3339()
2003 );
2004
2005 if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2006 return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2007 }
2008
2009 info!(
2010 "[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2011 version, backup_file
2012 );
2013
2014 Ok(())
2015 }
2016
2017 async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2019 let cache_dir = self.UpdateManager.GetCacheDirectory();
2020 let rollback_dir = cache_dir.join("rollback");
2021 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2022
2023 if backup_file.exists() {
2024 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2025 return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2026 }
2027 info!("[AirVinegRPCService] Rollback backup cleaned up for version {}", version);
2028 }
2029
2030 Ok(())
2031 }
2032
2033 async fn perform_rollback(&self, version:&str) -> Result<()> {
2035 let cache_dir = self.UpdateManager.GetCacheDirectory();
2036 let rollback_dir = cache_dir.join("rollback");
2037 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2038
2039 if !backup_file.exists() {
2040 return Err(AirError::FileSystem(format!(
2041 "Rollback backup not found for version {}",
2042 version
2043 )));
2044 }
2045
2046 log::info!("[AirVinegRPCService] Starting rollback for version {}", version);
2047
2048 let marker_content = tokio::fs::read_to_string(&backup_file)
2050 .await
2051 .map_err(|e| format!("Failed to read backup marker: {}", e))?;
2052
2053 let mut timestamp = None;
2055 let mut rollback_available = false;
2056
2057 for line in marker_content.lines() {
2058 if let Some(value) = line.strip_prefix("timestamp=") {
2059 timestamp = Some(value.to_string());
2060 } else if line == "rollback_available=true" {
2061 rollback_available = true;
2062 }
2063 }
2064
2065 if !rollback_available {
2066 return Err(AirError::Validation("Rollback not available for this version".to_string()));
2067 }
2068
2069 log::info!(
2076 "[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2077 version,
2078 timestamp
2079 );
2080
2081 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2083 log::warn!("[AirVinegRPCService] Failed to cleanup backup marker after rollback: {}", e);
2084 }
2085
2086 Ok(())
2087 }
2088}
2089
2090fn match_url_scheme(url:&str) -> bool {
2092 url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2093}
2094
2095fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2097 use sha2::{Digest, Sha256};
2098 let mut hasher = Sha256::new();
2099 hasher.update(chunk);
2100 format!("{:x}", hasher.finalize())
2101}
2102
2103async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2105 use sha2::{Digest, Sha256};
2106 use tokio::io::AsyncReadExt;
2107
2108 let mut file = tokio::fs::File::open(path)
2109 .await
2110 .map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2111
2112 let mut hasher = Sha256::new();
2113 let mut buffer = vec![0u8; 8192];
2114
2115 loop {
2116 let bytes_read = file
2117 .read(&mut buffer)
2118 .await
2119 .map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2120
2121 if bytes_read == 0 {
2122 break;
2123 }
2124
2125 hasher.update(&buffer[..bytes_read]);
2126 }
2127
2128 let result = hasher.finalize();
2129 Ok(format!("{:x}", result))
2130}