1use std::{
104 collections::{HashMap, VecDeque},
105 path::{Path, PathBuf},
106 sync::Arc,
107 time::{Duration, Instant},
108};
109
110use serde::{Deserialize, Serialize};
111use tokio::sync::{RwLock, Semaphore};
112
113use crate::{AirError, ApplicationState::ApplicationState, Configuration::ConfigurationManager, Result, Utility};
114
115pub struct DownloadManager {
117 AppState:Arc<ApplicationState>,
119
120 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
122
123 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
125
126 CacheDirectory:PathBuf,
128
129 client:reqwest::Client,
131
132 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
134
135 BandwidthLimiter:Arc<Semaphore>,
137
138 TokenBucket:Arc<RwLock<TokenBucket>>,
140
141 ConcurrentLimiter:Arc<Semaphore>,
143
144 statistics:Arc<RwLock<DownloadStatistics>>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct DownloadStatus {
151 pub DownloadId:String,
152 pub url:String,
153 pub destination:PathBuf,
154 pub TotalSize:u64,
155 pub downloaded:u64,
156 pub progress:f32,
157 pub status:DownloadState,
158 pub error:Option<String>,
159 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
160 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
161 pub ChunksCompleted:usize,
162 pub TotalChunks:usize,
163 pub DownloadRateBytesPerSec:u64,
164 pub ExpectedChecksum:Option<String>,
165 pub ActualChecksum:Option<String>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
170pub enum DownloadState {
171 Pending,
172 Queued,
173 Downloading,
174 Verifying,
175 Completed,
176 Failed,
177 Cancelled,
178 Paused,
179 Resuming,
180}
181
182#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
184pub enum DownloadPriority {
185 High = 3,
186 Normal = 2,
187 Low = 1,
188 Background = 0,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct QueuedDownload {
194 DownloadId:String,
195 url:String,
196 destination:PathBuf,
197 checksum:String,
198 priority:DownloadPriority,
199 AddedAt:chrono::DateTime<chrono::Utc>,
200 MaxFileSize:Option<u64>,
201 ValidateDiskSpace:bool,
202}
203
204#[derive(Debug, Clone)]
206pub struct DownloadResult {
207 pub path:String,
208 pub size:u64,
209 pub checksum:String,
210 pub duration:Duration,
211 pub AverageRate:u64,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct DownloadStatistics {
217 pub TotalDownloads:u64,
218 pub SuccessfulDownloads:u64,
219 pub FailedDownloads:u64,
220 pub CancelledDownloads:u64,
221 pub TotalBytesDownloaded:u64,
222 pub TotalDownloadTimeSecs:f64,
223 pub AverageDownloadRate:f64,
224 pub PeakDownloadRate:u64,
225 pub ActiveDownloads:usize,
226 pub QueuedDownloads:usize,
227}
228
229pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
231
232#[derive(Debug)]
240struct TokenBucket {
241 tokens:f64,
243
244 capacity:f64,
246
247 refill_rate:f64,
249
250 last_refill:Instant,
252}
253
254impl TokenBucket {
255 fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
257 let refill_rate = bytes_per_sec as f64;
258 let capacity = refill_rate * capacity_factor; Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
261 }
262
263 fn refill(&mut self) {
265 let elapsed = self.last_refill.elapsed().as_secs_f64();
266 if elapsed > 0.0 {
267 let new_tokens = elapsed * self.refill_rate;
268 self.tokens = (self.tokens + new_tokens).min(self.capacity);
269 self.last_refill = Instant::now();
270 }
271 }
272
273 #[allow(dead_code)]
276 fn try_consume(&mut self, bytes:u64) -> u64 {
277 self.refill();
278
279 let bytes = bytes as f64;
280 if self.tokens >= bytes {
281 self.tokens -= bytes;
282 return bytes as u64;
283 }
284
285 let available = self.tokens;
287 self.tokens = 0.0;
288 available as u64
289 }
290
291 async fn consume(&mut self, bytes:u64) -> Result<()> {
293 let bytes_needed = bytes as f64;
294
295 loop {
296 self.refill();
297
298 if self.tokens >= bytes_needed {
299 self.tokens -= bytes_needed;
300 return Ok(());
301 }
302
303 let tokens_needed = bytes_needed - self.tokens;
305 let wait_duration = tokens_needed / self.refill_rate;
306
307 let sleep_duration = Duration::from_secs_f64(wait_duration.min(0.1));
309 tokio::time::sleep(sleep_duration).await;
310 }
311 }
312
313 fn set_rate(&mut self, bytes_per_sec:u64) {
315 self.refill_rate = bytes_per_sec as f64;
316 self.capacity = self.refill_rate * 5.0; }
318}
319
320#[derive(Debug, Clone)]
322pub struct DownloadConfig {
323 pub url:String,
324 pub destination:String,
325 pub checksum:String,
326 pub MaxFileSize:Option<u64>,
327 pub ChunkSize:usize,
328 pub MaxRetries:u32,
329 pub TimeoutSecs:u64,
330 pub priority:DownloadPriority,
331 pub ValidateDiskSpace:bool,
332}
333
334impl Default for DownloadConfig {
335 fn default() -> Self {
336 Self {
337 url:String::new(),
338 destination:String::new(),
339 checksum:String::new(),
340 MaxFileSize:None,
341 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
343 TimeoutSecs:300,
344 priority:DownloadPriority::Normal,
345 ValidateDiskSpace:true,
346 }
347 }
348}
349
350impl DownloadManager {
351 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
353 let config = &AppState.Configuration.Downloader;
354
355 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
357
358 let CacheDirectoryClone = CacheDirectory.clone();
360
361 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
363
364 tokio::fs::create_dir_all(&CacheDirectory)
366 .await
367 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
368
369 let client = reqwest::Client::builder()
371 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
372 .connect_timeout(Duration::from_secs(30))
373 .pool_idle_timeout(Duration::from_secs(90))
374 .pool_max_idle_per_host(10)
375 .tcp_keepalive(Duration::from_secs(60))
376 .user_agent("Land-AirDownloader/0.1.0")
377 .build()
378 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?;
379
380 let BandwidthLimiter = Arc::new(Semaphore::new(100));
382
383 let TokenBucket = Arc::new(RwLock::new(TokenBucket::new(100 * 1024 * 1024, 5.0)));
385
386 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
388
389 let manager = Self {
390 AppState,
391 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
392 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
393 CacheDirectory:CacheDirectoryCloneForInit,
394 client,
395 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
396 BandwidthLimiter,
397 TokenBucket,
398 ConcurrentLimiter,
399 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
400 };
401
402 manager
404 .AppState
405 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
406 .await
407 .map_err(|e| AirError::Internal(e.to_string()))?;
408
409 log::info!(
410 "[DownloadManager] Initialized with cache directory: {}",
411 CacheDirectory.display()
412 );
413
414 Ok(manager)
415 }
416
417 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
419 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
420 .await
421 }
422
423 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
425 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
427
428 let DownloadId = Utility::GenerateRequestId();
430
431 log::info!(
432 "[DownloadManager] Starting download [ID: {}] - URL: {}",
433 DownloadId,
434 SanitizedUrl
435 );
436
437 if SanitizedUrl.is_empty() {
439 return Err(AirError::Network("URL cannot be empty".to_string()));
440 }
441
442 let Destination = if config.destination.is_empty() {
444 let Filename = SanitizedUrl
446 .split('/')
447 .last()
448 .and_then(|s| s.split('?').next())
449 .unwrap_or("download.bin");
450 self.CacheDirectory.join(Filename)
451 } else {
452 ConfigurationManager::ExpandPath(&config.destination)?
453 };
454
455 Utility::ValidateFilePath(
457 Destination
458 .to_str()
459 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
460 )?;
461
462 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
464
465 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
467 .await?;
468
469 if config.ValidateDiskSpace {
471 if let Some(MaxSize) = config.MaxFileSize {
472 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
473 } else {
474 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
476 }
477
478 if let Some(Parent) = Destination.parent() {
480 tokio::fs::create_dir_all(Parent)
481 .await
482 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
483 }
484
485 let StartTime = Instant::now();
486
487 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
489
490 let Duration = StartTime.elapsed();
491
492 match Result {
493 Ok(mut FileInfo) => {
494 FileInfo.duration = Duration;
495
496 self.UpdateStatistics(true, FileInfo.size, Duration).await;
498
499 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
500 .await?;
501
502 log::info!(
503 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
504 DownloadId,
505 FileInfo.size,
506 Duration.as_secs_f64(),
507 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
508 );
509
510 Ok(FileInfo)
511 },
512 Err(E) => {
513 self.UpdateStatistics(false, 0, Duration).await;
515
516 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
517 .await?;
518
519 if Destination.exists() {
521 let _ = tokio::fs::remove_file(&Destination).await;
522 log::warn!("[DownloadManager] Cleaned up failed download: {}", Destination.display());
523 }
524
525 log::error!("[DownloadManager] Download failed [ID: {}] - Error: {}", DownloadId, E);
526
527 Err(E)
528 },
529 }
530 }
531
532 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
534 let url = url.trim();
535
536 if url.is_empty() {
538 return Err(AirError::Network("URL cannot be empty".to_string()));
539 }
540
541 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
543
544 match parsed.scheme() {
546 "http" | "https" => (),
547 scheme => {
548 return Err(AirError::Network(format!(
549 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
550 scheme
551 )));
552 },
553 }
554
555 if parsed.host().is_none() {
557 return Err(AirError::Network("URL must have a valid host".to_string()));
558 }
559
560 #[cfg(debug_assertions)]
562 {
563 }
565 #[cfg(not(debug_assertions))]
566 {
567 if let Some(host) = parsed.host_str() {
568 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
569 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
570 }
571 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
572 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
573 }
574 }
575 }
576
577 let mut sanitized = parsed.clone();
579
580 if sanitized.password().is_some() {
582 sanitized.set_password(Some("")).ok();
583 }
584
585 Ok(sanitized.to_string())
586 }
587
588 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
590 let DestPath = if destination.is_absolute() {
592 destination.to_path_buf()
593 } else {
594 std::env::current_dir()
595 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
596 .join(destination)
597 };
598
599 let MountPoint = self.FindMountPoint(&DestPath)?;
601
602 log::debug!(
605 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
606 url,
607 RequiredBytes,
608 MountPoint.display()
609 );
610
611 #[cfg(unix)]
612 {
613 match self.GetDiskStatvfs(&MountPoint) {
614 Ok((AvailableBytes, TotalBytes)) => {
615 if AvailableBytes < RequiredBytes {
616 log::warn!(
617 "[DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
618 AvailableBytes,
619 RequiredBytes
620 );
621 return Err(AirError::FileSystem(format!(
622 "Insufficient disk space: {} bytes available, {} bytes required",
623 AvailableBytes, RequiredBytes
624 )));
625 }
626
627 log::debug!(
628 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
629 AvailableBytes,
630 RequiredBytes,
631 TotalBytes
632 );
633 },
634 Err(e) => {
635 log::warn!("[DownloadManager] Failed to check disk space: {}, proceeding anyway", e);
636 },
637 }
638 }
639
640 #[cfg(windows)]
641 {
642 match self.GetDiskSpaceWindows(&MountPoint) {
643 Ok(AvailableBytes) => {
644 if AvailableBytes < RequiredBytes {
645 log::warn!(
646 "[DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
647 AvailableBytes,
648 RequiredBytes
649 );
650 return Err(AirError::FileSystem(format!(
651 "Insufficient disk space: {} bytes available, {} bytes required",
652 available_bytes, RequiredBytes
653 )));
654 }
655 log::debug!(
656 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
657 available_bytes,
658 RequiredBytes
659 );
660 },
661 Err(e) => {
662 log::warn!("[DownloadManager] Failed to check disk space: {}, proceeding anyway", e);
663 },
664 }
665 }
666
667 #[cfg(not(any(unix, windows)))]
668 {
669 log::warn!("[DownloadManager] Disk space validation not available on this platform");
670 }
671
672 Ok(())
673 }
674
675 #[cfg(unix)]
677 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
678 use std::{ffi::CString, os::unix::ffi::OsStrExt};
679
680 log::debug!("[DownloadManager] Checking disk space at: {}", path.display());
681
682 let path_cstr = CString::new(path.as_os_str().as_bytes())
684 .map_err(|e| AirError::FileSystem(format!("Failed to convert path to C string: {}", e)))?;
685
686 let mut stat:libc::statvfs = unsafe { std::mem::zeroed() };
688 let result = unsafe { libc::statvfs(path_cstr.as_ptr(), &mut stat) };
689
690 if result != 0 {
691 let err = std::io::Error::last_os_error();
692 return Err(AirError::FileSystem(format!("Failed to get disk stats: {}", err)));
693 }
694
695 let fragment_size = stat.f_frsize as u64;
697 let available_bytes = fragment_size * stat.f_bavail as u64;
698 let total_bytes = fragment_size * stat.f_blocks as u64;
699
700 log::debug!(
701 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
702 path.display(),
703 available_bytes,
704 total_bytes
705 );
706
707 Ok((available_bytes, total_bytes))
708 }
709
710 #[cfg(windows)]
712 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
713 use std::os::windows::ffi::OsStrExt;
714
715 use windows::Win32::Storage::FileSystem::GetDiskFreeSpaceExW;
716
717 log::debug!("[DownloadManager] Checking disk space at: {}", path.display());
718
719 let path_str:Vec<u16> = path.as_os_str().encode_wide().chain(std::iter::once(0)).collect();
721
722 let mut free_bytes_available:u64 = 0;
723 let mut total_bytes:u64 = 0;
724 let mut total_free_bytes:u64 = 0;
725
726 let result = unsafe {
727 GetDiskFreeSpaceExW(
728 windows::core::PCWSTR(path_str.as_ptr()),
729 &mut free_bytes_available as *mut _ as _,
730 &mut total_bytes as *mut _ as _,
731 &mut total_free_bytes as *mut _ as _,
732 )
733 };
734
735 if !result.as_bool() {
736 let err = std::io::Error::last_os_error();
737 return Err(AirError::FileSystem(format!("Failed to get disk space: {}", err)));
738 }
739
740 log::debug!(
741 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
742 path.display(),
743 free_bytes_available,
744 total_bytes
745 );
746
747 Ok(free_bytes_available)
748 }
749
750 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
752 #[cfg(unix)]
753 {
754 let mut current = path
755 .canonicalize()
756 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
757
758 loop {
759 if current.as_os_str().is_empty() || current == Path::new("/") {
760 return Ok(PathBuf::from("/"));
761 }
762
763 let metadata = std::fs::metadata(¤t)
764 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
765
766 #[cfg(unix)]
768 let CurrentDevice = {
769 use std::os::unix::fs::MetadataExt;
770 metadata.dev()
771 };
772 #[cfg(not(unix))]
773 let CurrentDevice = 0u64; let parent = current.parent();
776
777 if let Some(parent_path) = parent {
778 let ParentMetadata = std::fs::metadata(parent_path)
779 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
780
781 #[cfg(unix)]
782 let ParentDevice = {
783 use std::os::unix::fs::MetadataExt;
784 ParentMetadata.dev()
785 };
786 #[cfg(not(unix))]
787 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
790 return Ok(current);
791 }
792 } else {
793 return Ok(current);
794 }
795
796 current.pop();
797 }
798 }
799
800 #[cfg(windows)]
801 {
802 let PathStr = path.to_string_lossy();
804 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
805 return Ok(PathBuf::from(&PathStr[..3]));
806 }
807 Ok(PathBuf::from("C:\\"))
808 }
809
810 #[cfg(not(any(unix, windows)))]
811 {
812 Ok(path.to_path_buf())
813 }
814 }
815
816 async fn DownloadWithRetry(
818 &self,
819 DownloadId:&str,
820 url:&str,
821 destination:&PathBuf,
822 config:&DownloadConfig,
823 ) -> Result<DownloadResult> {
824 let RetryPolicy = crate::Resilience::RetryPolicy {
825 MaxRetries:config.MaxRetries,
826 InitialIntervalMs:1000,
827 MaxIntervalMs:32000,
828 BackoffMultiplier:2.0,
829 JitterFactor:0.1,
830 BudgetPerMinute:100,
831 ErrorClassification:std::collections::HashMap::new(),
832 };
833
834 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
835 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
836 "downloader".to_string(),
837 crate::Resilience::CircuitBreakerConfig::default(),
838 );
839
840 let mut attempt = 0;
841
842 loop {
843 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
845 if !CircuitBreaker.AttemptRecovery().await {
846 return Err(AirError::Network(
847 "Circuit breaker is open, too many recent failures".to_string(),
848 ));
849 }
850 }
851
852 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
854 if status.status == DownloadState::Cancelled {
855 return Err(AirError::Network("Download cancelled".to_string()));
856 }
857 }
858
859 match self.PerformDownload(DownloadId, url, destination, config).await {
860 Ok(file_info) => {
861 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
863 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
864 .await?;
865
866 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
867 log::warn!("[DownloadManager] Checksum verification failed [ID: {}]: {}", DownloadId, e);
868 CircuitBreaker.RecordFailure().await;
869
870 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
871 attempt += 1;
872 let delay = RetryManager.CalculateRetryDelay(attempt);
873 log::info!(
874 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
875 DownloadId,
876 attempt + 1,
877 config.MaxRetries + 1,
878 delay
879 );
880 tokio::time::sleep(delay).await;
881 continue;
882 } else {
883 return Err(AirError::Network(format!(
884 "Checksum verification failed after {} retries: {}",
885 attempt, e
886 )));
887 }
888 }
889 }
890
891 CircuitBreaker.RecordSuccess().await;
892 return Ok(file_info);
893 },
894 Err(e) => {
895 CircuitBreaker.RecordFailure().await;
896
897 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
898 attempt += 1;
899 log::warn!(
900 "[DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
901 DownloadId,
902 attempt + 1,
903 config.MaxRetries + 1,
904 e
905 );
906
907 let delay = RetryManager.CalculateRetryDelay(attempt);
908 tokio::time::sleep(delay).await;
909 } else {
910 return Err(e);
911 }
912 },
913 }
914 }
915 }
916
917 async fn PerformDownload(
919 &self,
920 DownloadId:&str,
921 url:&str,
922 destination:&PathBuf,
923 config:&DownloadConfig,
924 ) -> Result<DownloadResult> {
925 let _concurrent_permit = self
927 .ConcurrentLimiter
928 .acquire()
929 .await
930 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
931
932 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
933 .await?;
934
935 let TempDestination = destination.with_extension("tmp");
937
938 let mut ExistingSize:u64 = 0;
940 if TempDestination.exists() {
941 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
942 ExistingSize = metadata.len();
943 log::info!("[DownloadManager] Resuming download from {} bytes", ExistingSize);
944 }
945 }
946
947 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
949 if ExistingSize > 0 {
950 let RangeHeader = format!("bytes={}-", ExistingSize);
951 req = req.header(reqwest::header::RANGE, RangeHeader);
952 req = req.header(reqwest::header::IF_MATCH, "*"); }
954
955 let response = req
956 .send()
957 .await
958 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
959
960 let FinalUrl = response.url().clone();
962 let response = if FinalUrl.as_str() != url {
963 log::info!("[DownloadManager] Redirected to: {}", FinalUrl);
964 response
965 } else {
966 response
967 };
968
969 let StatusCode = response.status();
971 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
972 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
973 }
974
975 let TotalSize = if let Some(cl) = response.content_length() {
977 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
978 cl + ExistingSize
979 } else {
980 cl
981 }
982 } else {
983 0
984 };
985
986 if let Some(max_size) = config.MaxFileSize {
988 if TotalSize > 0 && TotalSize > max_size {
989 return Err(AirError::Network(format!(
990 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
991 TotalSize, max_size
992 )));
993 }
994 }
995
996 let mut file = tokio::fs::OpenOptions::new()
998 .create(true)
999 .append(true)
1000 .open(&TempDestination)
1001 .await
1002 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
1003
1004 use tokio::io::AsyncWriteExt;
1005 use futures_util::StreamExt;
1006
1007 let mut downloaded = ExistingSize;
1008 let mut LastProgressUpdate = Instant::now();
1009 let BytesStream = response.bytes_stream();
1010
1011 tokio::pin!(BytesStream);
1012
1013 while let Some(result) = BytesStream.next().await {
1014 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1016 match status.status {
1017 DownloadState::Cancelled => {
1018 let _ = tokio::fs::remove_file(&TempDestination).await;
1020 return Err(AirError::Network("Download cancelled".to_string()));
1021 },
1022 DownloadState::Paused => {
1023 loop {
1025 tokio::time::sleep(Duration::from_millis(250)).await;
1026 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
1027 match s.status {
1028 DownloadState::Paused => continue,
1029 DownloadState::Cancelled => {
1030 let _ = tokio::fs::remove_file(&TempDestination).await;
1031 return Err(AirError::Network("Download cancelled".to_string()));
1032 },
1033 _ => {
1034 log::info!("[DownloadManager] Resuming paused download [ID: {}]", DownloadId);
1035 break;
1036 },
1037 }
1038 } else {
1039 break;
1040 }
1041 }
1042 },
1043 _ => {},
1044 }
1045 }
1046
1047 match result {
1048 Ok(chunk) => {
1049 let ChunkSize = chunk.len();
1051 {
1052 let mut bucket = self.TokenBucket.write().await;
1053 if let Err(e) = bucket.consume(ChunkSize as u64).await {
1054 log::warn!("[DownloadManager] Bandwidth throttling error: {}, continuing anyway", e);
1055 }
1056 }
1057
1058 file.write_all(&chunk)
1059 .await
1060 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
1061
1062 downloaded += ChunkSize as u64;
1063
1064 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
1066 LastProgressUpdate = Instant::now();
1067
1068 if TotalSize > 0 {
1069 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
1070 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
1071 .await?;
1072 }
1073
1074 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
1076 self.UpdateDownloadRate(DownloadId, rate).await;
1077 }
1078 },
1079 Err(e) => {
1080 if e.is_timeout() || e.is_connect() {
1082 log::warn!("[DownloadManager] Connection/timeout error, may retry: {}", e);
1083 return Err(AirError::Network(format!("Network error: {}", e)));
1084 }
1085 return Err(AirError::Network(format!("Failed to read response: {}", e)));
1086 },
1087 }
1088 }
1089
1090 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
1092 .await?;
1093
1094 file.flush()
1096 .await
1097 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1098
1099 tokio::fs::rename(&TempDestination, destination)
1101 .await
1102 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
1103
1104 let checksum = self.CalculateChecksum(destination).await?;
1106
1107 self.UpdateActualChecksum(DownloadId, &checksum).await;
1109
1110 Ok(DownloadResult {
1111 path:destination.to_string_lossy().to_string(),
1112 size:downloaded,
1113 checksum,
1114 duration:Duration::from_secs(0),
1115 AverageRate:0,
1116 })
1117 }
1118
1119 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
1121 if !FilePath.exists() {
1123 return Err(AirError::FileSystem(format!(
1124 "File not found for checksum verification: {}",
1125 FilePath.display()
1126 )));
1127 }
1128
1129 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
1130
1131 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
1133 let NormalizedActual = ActualChecksum.trim().to_lowercase();
1134
1135 if NormalizedActual != NormalizedExpected {
1136 log::error!(
1137 "[DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1138 FilePath.display(),
1139 NormalizedExpected,
1140 NormalizedActual
1141 );
1142 return Err(AirError::Network(format!(
1143 "Checksum verification failed: expected {}, got {}",
1144 NormalizedExpected, NormalizedActual
1145 )));
1146 }
1147
1148 log::info!("[DownloadManager] Checksum verified for file: {}", FilePath.display());
1149
1150 Ok(())
1151 }
1152
1153 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1155 if !FilePath.exists() {
1157 return Err(AirError::FileSystem(format!(
1158 "File not found for checksum calculation: {}",
1159 FilePath.display()
1160 )));
1161 }
1162
1163 self.ChecksumVerifier.CalculateSha256(FilePath).await
1164 }
1165
1166 async fn RegisterDownload(
1168 &self,
1169 DownloadId:&str,
1170 url:&str,
1171 destination:&PathBuf,
1172 ExpectedChecksum:Option<String>,
1173 ) -> Result<()> {
1174 let mut downloads = self.ActiveDownloads.write().await;
1175 let mut stats = self.statistics.write().await;
1176
1177 stats.ActiveDownloads += 1;
1178
1179 downloads.insert(
1180 DownloadId.to_string(),
1181 DownloadStatus {
1182 DownloadId:DownloadId.to_string(),
1183 url:url.to_string(),
1184 destination:destination.clone(),
1185 TotalSize:0,
1186 downloaded:0,
1187 progress:0.0,
1188 status:DownloadState::Pending,
1189 error:None,
1190 StartedAt:Some(chrono::Utc::now()),
1191 CompletedAt:None,
1192 ChunksCompleted:0,
1193 TotalChunks:1,
1194 DownloadRateBytesPerSec:0,
1195 ExpectedChecksum:ExpectedChecksum.clone(),
1196 ActualChecksum:None,
1197 },
1198 );
1199
1200 Ok(())
1201 }
1202
1203 async fn UpdateDownloadStatus(
1205 &self,
1206 DownloadId:&str,
1207 status:DownloadState,
1208 progress:Option<f32>,
1209 error:Option<String>,
1210 ) -> Result<()> {
1211 let mut downloads = self.ActiveDownloads.write().await;
1212
1213 if let Some(download) = downloads.get_mut(DownloadId) {
1214 if status == DownloadState::Completed || status == DownloadState::Failed {
1215 download.CompletedAt = Some(chrono::Utc::now());
1216 }
1217 download.status = status;
1218 if let Some(progress) = progress {
1219 download.progress = progress;
1220 }
1221 download.error = error;
1222 }
1223
1224 Ok(())
1225 }
1226
1227 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1229 let mut downloads = self.ActiveDownloads.write().await;
1230 if let Some(download) = downloads.get_mut(DownloadId) {
1231 download.DownloadRateBytesPerSec = rate;
1232 }
1233 }
1234
1235 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1237 let mut downloads = self.ActiveDownloads.write().await;
1238 if let Some(download) = downloads.get_mut(DownloadId) {
1239 download.ActualChecksum = Some(checksum.to_string());
1240 }
1241 }
1242
1243 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1245 let downloads = self.ActiveDownloads.read().await;
1246 if let Some(download) = downloads.get(DownloadId) {
1247 if let Some(StartedAt) = download.StartedAt {
1248 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1249 let ElapsedSecs = elapsed.num_seconds() as u64;
1250 if ElapsedSecs > 0 {
1251 return CurrentBytes / ElapsedSecs;
1252 }
1253 }
1254 }
1255 0
1256 }
1257
1258 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1260 let mut stats = self.statistics.write().await;
1261
1262 if success {
1263 stats.SuccessfulDownloads += 1;
1264 stats.TotalBytesDownloaded += bytes;
1265 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1266
1267 if stats.TotalDownloadTimeSecs > 0.0 {
1268 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1269 }
1270
1271 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1273 (bytes as f64 / duration.as_secs_f64()) as u64
1274 } else {
1275 0
1276 };
1277 if CurrentRate > stats.PeakDownloadRate {
1278 stats.PeakDownloadRate = CurrentRate;
1279 }
1280 } else {
1281 stats.FailedDownloads += 1;
1282 }
1283
1284 stats.TotalDownloads += 1;
1285 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1286 }
1287
1288 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1290 let downloads = self.ActiveDownloads.read().await;
1291 downloads.get(DownloadId).cloned()
1292 }
1293
1294 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1296 let downloads = self.ActiveDownloads.read().await;
1297 downloads.values().cloned().collect()
1298 }
1299
1300 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1302 log::info!("[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1303
1304 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1305 .await?;
1306
1307 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1309 let TempPath = status.destination.with_extension("tmp");
1310 if TempPath.exists() {
1311 let _ = tokio::fs::remove_file(&TempPath).await;
1312 }
1313 }
1314
1315 {
1317 let mut stats = self.statistics.write().await;
1318 stats.CancelledDownloads += 1;
1319 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1320 }
1321
1322 Ok(())
1323 }
1324
1325 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1327 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1328 log::info!("[DownloadManager] Download paused [ID: {}]", DownloadId);
1329 Ok(())
1330 }
1331
1332 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1334 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1335 if status.status == DownloadState::Paused {
1336 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1337 .await?;
1338 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1340 .await?;
1341 log::info!("[DownloadManager] Download resumed [ID: {}]", DownloadId);
1342 } else {
1343 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1344 }
1345 } else {
1346 return Err(AirError::Network("Download not found".to_string()));
1347 }
1348 Ok(())
1349 }
1350
1351 pub async fn GetActiveDownloadCount(&self) -> usize {
1353 let downloads = self.ActiveDownloads.read().await;
1354 downloads
1355 .iter()
1356 .filter(|(_, s)| {
1357 matches!(
1358 s.status,
1359 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1360 )
1361 })
1362 .count()
1363 }
1364
1365 pub async fn GetStatistics(&self) -> DownloadStatistics {
1367 let stats = self.statistics.read().await;
1368 stats.clone()
1369 }
1370
1371 pub async fn QueueDownload(
1373 &self,
1374 url:String,
1375 destination:String,
1376 checksum:String,
1377 priority:DownloadPriority,
1378 ) -> Result<String> {
1379 let DownloadId = Utility::GenerateRequestId();
1380
1381 let destination = if destination.is_empty() {
1382 let filename = url.split('/').last().unwrap_or("download.bin");
1383 self.CacheDirectory.join(filename)
1384 } else {
1385 ConfigurationManager::ExpandPath(&destination)?
1386 };
1387
1388 let queued_download = QueuedDownload {
1389 DownloadId:DownloadId.clone(),
1390 url,
1391 destination,
1392 checksum,
1393 priority,
1394 AddedAt:chrono::Utc::now(),
1395 MaxFileSize:None,
1396 ValidateDiskSpace:true,
1397 };
1398
1399 let mut queue = self.DownloadQueue.write().await;
1400 queue.push_back(queued_download);
1401
1402 queue.make_contiguous().sort_by(|a, b| {
1404 match b.priority.cmp(&a.priority) {
1405 std::cmp::Ordering::Equal => {
1406 a.AddedAt.cmp(&b.AddedAt)
1408 },
1409 order => order,
1410 }
1411 });
1412
1413 {
1414 let mut stats = self.statistics.write().await;
1415 stats.QueuedDownloads += 1;
1416 }
1417
1418 log::info!(
1419 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1420 DownloadId,
1421 priority
1422 );
1423
1424 Ok(DownloadId)
1425 }
1426
1427 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1429 let mut queue = self.DownloadQueue.write().await;
1430
1431 if let Some(queued) = queue.pop_front() {
1432 let download_id = queued.DownloadId.clone();
1433 drop(queue); let config = DownloadConfig {
1436 url:queued.url.clone(),
1437 destination:queued.destination.to_string_lossy().to_string(),
1438 checksum:queued.checksum.clone(),
1439 priority:queued.priority,
1440 MaxFileSize:queued.MaxFileSize,
1441 ValidateDiskSpace:queued.ValidateDiskSpace,
1442 ..Default::default()
1443 };
1444
1445 {
1446 let mut stats = self.statistics.write().await;
1447 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1448 }
1449
1450 let manager = self.clone();
1452 let download_id_clone = download_id.clone();
1453 tokio::spawn(async move {
1454 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1455 log::error!("[DownloadManager] Queued download failed [ID: {}]: {}", download_id_clone, e);
1456 let _ = manager
1458 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1459 .await;
1460 }
1461 });
1462
1463 Ok(Some(download_id))
1464 } else {
1465 Ok(None)
1466 }
1467 }
1468
1469 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1471 let manager = self.clone();
1472
1473 let handle = tokio::spawn(async move {
1474 manager.BackgroundTaskLoop().await;
1475 });
1476
1477 log::info!("[DownloadManager] Background tasks started");
1478
1479 Ok(handle)
1480 }
1481
1482 async fn BackgroundTaskLoop(&self) {
1484 let mut interval = tokio::time::interval(Duration::from_secs(60));
1485
1486 loop {
1487 interval.tick().await;
1488
1489 if let Err(e) = self.ProcessQueue().await {
1491 log::error!("[DownloadManager] Queue processing error: {}", e);
1492 }
1493
1494 self.CleanupCompletedDownloads().await;
1496
1497 if let Err(e) = self.CleanupCache().await {
1499 log::error!("[DownloadManager] Cache cleanup failed: {}", e);
1500 }
1501 }
1502 }
1503
1504 async fn CleanupCompletedDownloads(&self) {
1506 let mut downloads = self.ActiveDownloads.write().await;
1507
1508 let mut cleaned_count = 0;
1509 downloads.retain(|_, download| {
1510 let is_final = matches!(
1511 download.status,
1512 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1513 );
1514 if is_final {
1515 cleaned_count += 1;
1516 }
1517 !is_final
1518 });
1519
1520 if cleaned_count > 0 {
1521 log::debug!("[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1522 }
1523 }
1524
1525 async fn CleanupCache(&self) -> Result<()> {
1527 let max_age_days = 7;
1528 let now = chrono::Utc::now();
1529
1530 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1531 .await
1532 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1533
1534 let mut cleaned_count = 0;
1535
1536 while let Some(entry) = entries
1537 .next_entry()
1538 .await
1539 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1540 {
1541 let metadata = entry
1542 .metadata()
1543 .await
1544 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1545
1546 if metadata.is_file() {
1547 let path = entry.path();
1548
1549 let IsActive = {
1551 let downloads = self.ActiveDownloads.read().await;
1552 downloads.values().any(|d| d.destination == path)
1553 };
1554
1555 if IsActive {
1556 continue;
1557 }
1558
1559 let modified = metadata
1560 .modified()
1561 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1562
1563 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1564 let age = now.signed_duration_since(modified_time);
1565
1566 if age.num_days() > max_age_days {
1567 match tokio::fs::remove_file(&path).await {
1568 Ok(_) => {
1569 log::debug!(
1570 "[DownloadManager] Removed old cache file: {}",
1571 entry.file_name().to_string_lossy()
1572 );
1573 cleaned_count += 1;
1574 },
1575 Err(e) => {
1576 log::warn!(
1577 "[DownloadManager] Failed to remove cache file {}: {}",
1578 entry.file_name().to_string_lossy(),
1579 e
1580 );
1581 },
1582 }
1583 }
1584 }
1585 }
1586
1587 if cleaned_count > 0 {
1588 log::info!("[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1589 }
1590
1591 Ok(())
1592 }
1593
1594 pub async fn StopBackgroundTasks(&self) {
1596 log::info!("[DownloadManager] Stopping background tasks");
1597
1598 let ids_to_cancel:Vec<String> = {
1600 let downloads = self.ActiveDownloads.read().await;
1601 downloads
1602 .iter()
1603 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1604 .map(|(id, _)| id.clone())
1605 .collect()
1606 };
1607
1608 for id in ids_to_cancel {
1610 let _ = self.CancelDownload(&id).await;
1611 }
1612
1613 let _ = self
1615 .AppState
1616 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1617 .await;
1618 }
1619
1620 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1633 let bytes_per_sec = (mb_per_sec.max(1).min(1000) * 1024 * 1024) as u64;
1634
1635 {
1637 let mut bucket = self.TokenBucket.write().await;
1638 bucket.set_rate(bytes_per_sec);
1639 }
1640
1641 let permits = mb_per_sec.max(1).min(1000);
1643 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1644
1645 log::info!(
1646 "[DownloadManager] Bandwidth limit set to {} MB/s ({} bytes/s)",
1647 mb_per_sec,
1648 bytes_per_sec
1649 );
1650 }
1651
1652 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1656 let permits = max.max(1).min(20);
1657 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1658 log::info!("[DownloadManager] Max concurrent downloads set to {}", max);
1659 }
1660}
1661
1662impl Clone for DownloadManager {
1663 fn clone(&self) -> Self {
1664 Self {
1665 AppState:self.AppState.clone(),
1666 ActiveDownloads:self.ActiveDownloads.clone(),
1667 DownloadQueue:self.DownloadQueue.clone(),
1668 CacheDirectory:self.CacheDirectory.clone(),
1669 client:self.client.clone(),
1670 ChecksumVerifier:self.ChecksumVerifier.clone(),
1671 BandwidthLimiter:self.BandwidthLimiter.clone(),
1672 TokenBucket:self.TokenBucket.clone(),
1673 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1674 statistics:self.statistics.clone(),
1675 }
1676 }
1677}
1678
1679impl Default for DownloadStatistics {
1680 fn default() -> Self {
1681 Self {
1682 TotalDownloads:0,
1683 SuccessfulDownloads:0,
1684 FailedDownloads:0,
1685 CancelledDownloads:0,
1686 TotalBytesDownloaded:0,
1687 TotalDownloadTimeSecs:0.0,
1688 AverageDownloadRate:0.0,
1689 PeakDownloadRate:0,
1690 ActiveDownloads:0,
1691 QueuedDownloads:0,
1692 }
1693 }
1694}
1695
1696fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
1698 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
1699}
1700
1701#[derive(Debug, Clone)]
1703struct ChunkInfo {
1704 start:u64,
1705 end:u64,
1706 downloaded:u64,
1707 temp_path:PathBuf,
1708}
1709
1710#[derive(Debug)]
1712struct ParallelDownloadResult {
1713 chunks:Vec<ChunkInfo>,
1714 total_size:u64,
1715}
1716
1717impl DownloadManager {
1754 pub async fn DownloadFileWithChunks(
1764 &self,
1765 url:String,
1766 destination:String,
1767 checksum:String,
1768 chunk_size_mb:usize,
1769 ) -> Result<DownloadResult> {
1770 log::info!(
1771 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
1772 url,
1773 chunk_size_mb
1774 );
1775
1776 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
1778
1779 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
1781
1782 log::info!("[DownloadManager] Remote file size: {} bytes", total_size);
1783
1784 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
1787 log::info!("[DownloadManager] File too small for chunked download, using normal download");
1788 return self.DownloadFile(url, destination, checksum).await;
1789 }
1790
1791 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
1793 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
1794 let num_concurrent = num_chunks.min(4); log::info!(
1797 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
1798 num_chunks,
1799 num_concurrent
1800 );
1801
1802 let DownloadId = Utility::GenerateRequestId();
1803 let DestinationPath = if destination.is_empty() {
1804 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
1805 self.CacheDirectory.join(filename)
1806 } else {
1807 ConfigurationManager::ExpandPath(&destination)?
1808 };
1809
1810 let temp_dir = DestinationPath.with_extension("chunks");
1812 tokio::fs::create_dir_all(&temp_dir)
1813 .await
1814 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
1815
1816 let mut chunks = Vec::with_capacity(num_chunks);
1818 for i in 0..num_chunks {
1819 let start = (i as u64) * chunk_size;
1820 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
1821
1822 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
1823 }
1824
1825 let downloaded_tracker = Arc::new(RwLock::new(0u64));
1827 let completed_tracker = Arc::new(RwLock::new(0usize));
1828
1829 let mut handles = Vec::new();
1831 for (i, chunk) in chunks.iter().enumerate() {
1832 let manager = self.clone();
1833 let url_clone = sanitized_url.clone();
1834 let chunk_clone = chunk.clone();
1835 let downloaded_tracker = downloaded_tracker.clone();
1836 let completed_tracker = completed_tracker.clone();
1837 let _Did = DownloadId.clone();
1838
1839 let handle = tokio::spawn(async move {
1840 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
1841
1842 {
1844 let mut downloaded = downloaded_tracker.write().await;
1845 let mut completed = completed_tracker.write().await;
1846 *downloaded += chunk_clone.end - chunk_clone.start + 1;
1847 *completed += 1;
1848
1849 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
1850 log::info!(
1851 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
1852 i + 1,
1853 *completed,
1854 num_chunks,
1855 progress
1856 );
1857 }
1858
1859 Ok::<_, AirError>(())
1860 });
1861
1862 if (i + 1) % num_concurrent == 0 {
1864 for handle in handles.drain(..) {
1865 handle.await??;
1866 }
1867 }
1868
1869 handles.push(handle);
1870 }
1871
1872 for handle in handles {
1874 handle.await??;
1875 }
1876
1877 log::info!("[DownloadManager] Reassembling chunks into final file");
1879 self.ReassembleChunks(&chunks, &DestinationPath).await?;
1880
1881 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
1883 log::warn!("[DownloadManager] Failed to clean up temp directory: {}", e);
1884 AirError::FileSystem(e.to_string())
1885 })?;
1886
1887 if !checksum.is_empty() {
1889 self.VerifyChecksum(&DestinationPath, &checksum).await?;
1890 }
1891
1892 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
1893
1894 log::info!("[DownloadManager] Chunked download completed successfully");
1895
1896 Ok(DownloadResult {
1897 path:DestinationPath.to_string_lossy().to_string(),
1898 size:total_size,
1899 checksum:actual_checksum,
1900 duration:Duration::from_secs(0),
1901 AverageRate:0,
1902 })
1903 }
1904
1905 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
1907 let response = self
1908 .client
1909 .head(url)
1910 .timeout(Duration::from_secs(30))
1911 .send()
1912 .await
1913 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
1914
1915 if !response.status().is_success() {
1916 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
1917 }
1918
1919 response
1920 .content_length()
1921 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
1922 }
1923
1924 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
1926 log::debug!(
1927 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
1928 chunk_index,
1929 chunk.start,
1930 chunk.end
1931 );
1932
1933 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
1934
1935 let response = self
1936 .client
1937 .get(url)
1938 .header(reqwest::header::RANGE, range_header)
1939 .timeout(Duration::from_secs(300))
1940 .send()
1941 .await
1942 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
1943
1944 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
1945 return Err(AirError::Network(format!(
1946 "Chunk download failed with status: {}",
1947 response.status()
1948 )));
1949 }
1950
1951 let bytes = response
1953 .bytes()
1954 .await
1955 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
1956
1957 tokio::fs::write(&chunk.temp_path, &bytes)
1958 .await
1959 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
1960
1961 log::debug!("[DownloadManager] Chunk {} downloaded: {} bytes", chunk_index, bytes.len());
1962
1963 Ok(())
1964 }
1965
1966 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
1968 use tokio::io::AsyncWriteExt;
1969
1970 let mut file = tokio::fs::File::create(destination)
1971 .await
1972 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
1973
1974 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
1976 sorted_chunks.sort_by_key(|c| c.start);
1977
1978 for chunk in sorted_chunks {
1979 let contents = tokio::fs::read(&chunk.temp_path)
1980 .await
1981 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
1982
1983 file.write_all(&contents)
1984 .await
1985 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
1986
1987 log::debug!("[DownloadManager] Reassembled chunk (bytes {}-{})", chunk.start, chunk.end);
1988 }
1989
1990 file.flush()
1991 .await
1992 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1993
1994 log::info!("[DownloadManager] All chunks reassembled successfully");
1995
1996 Ok(())
1997 }
1998}