1use std::{
277 collections::HashMap,
278 sync::{Arc, Mutex},
279 time::{Duration, SystemTime},
280};
281
282use log::{debug, error, info, trace, warn};
283use serde::{Deserialize, Serialize};
284use tokio::time::interval;
285use tauri::{AppHandle, Emitter, Manager, State, command};
286use CommonLibrary::{Environment::Requires::Requires, FileSystem::FileSystemWriter::FileSystemWriter};
287
288use crate::{IPC::AdvancedFeatures::PerformanceStats, RunTime::ApplicationRunTime::ApplicationRunTime};
289
290#[derive(Clone, Serialize, Deserialize, Debug)]
297pub struct SyncStatus {
298 pub total_documents:u32,
299 pub synced_documents:u32,
300 pub conflicted_documents:u32,
301 pub offline_documents:u32,
302 pub last_sync_duration_ms:u64,
303}
304
305#[derive(Clone, Copy, PartialEq, Debug)]
307pub enum SyncState {
308 Modified,
309 Synced,
310 Conflicted,
311 Offline,
312}
313
314#[derive(Clone, Copy, Debug)]
316pub enum ChangeType {
317 Update,
318 Insert,
319 Delete,
320 Move,
321 Other,
322}
323
324#[derive(Clone, Debug)]
326pub struct SynchronizedDocument {
327 pub document_id:String,
328 pub file_path:String,
329 pub last_modified:u64,
330 pub content_hash:String,
331 pub sync_state:SyncState,
332 pub version:u32,
333}
334
335#[derive(Clone, Debug)]
337pub struct DocumentChange {
338 pub change_id:String,
339 pub document_id:String,
340 pub change_type:ChangeType,
341 pub content:Option<String>,
342 pub applied:bool,
343}
344
345pub struct DocumentSynchronization {
347 pub synchronized_documents:HashMap<String, SynchronizedDocument>,
348 pub pending_changes:HashMap<String, Vec<DocumentChange>>,
349 pub last_sync_time:u64,
350 pub sync_status:SyncStatus,
351}
352
353#[derive(Clone, Serialize, Deserialize, Debug)]
355pub struct RealTimeUpdate {
356 pub target:String,
357 pub data:String,
358}
359
360pub struct RealTimeUpdates {
362 pub updates:Vec<RealTimeUpdate>,
363 pub subscribers:HashMap<String, Vec<String>>,
364 pub update_queue:Vec<RealTimeUpdate>,
365 pub last_broadcast:u64,
366}
367
368#[derive(Clone, Debug)]
370pub struct ViewState {
371 pub zoom_level:f32,
372 pub sidebar_visible:bool,
373 pub panel_visible:bool,
374 pub status_bar_visible:bool,
375}
376
377#[derive(Clone, Debug)]
379pub struct GridLayout {
380 pub rows:u32,
381 pub columns:u32,
382 pub cell_width:u32,
383 pub cell_height:u32,
384}
385
386#[derive(Clone, Debug)]
388pub struct LayoutState {
389 pub editor_groups:Vec<String>,
390 pub active_group:u32,
391 pub grid_layout:GridLayout,
392}
393
394#[derive(Clone, Debug)]
396pub struct UIStateSynchronization {
397 pub active_editor:Option<String>,
398 pub cursor_positions:HashMap<String, (u32, u32)>,
399 pub selection_ranges:HashMap<String, (u32, u32)>,
400 pub view_state:ViewState,
401 pub theme:String,
402 pub layout:LayoutState,
403}
404
405#[derive(Clone)]
407pub struct WindAdvancedSync {
408 runtime:Arc<ApplicationRunTime>,
409 document_sync:Arc<Mutex<DocumentSynchronization>>,
410 ui_state_sync:Arc<Mutex<UIStateSynchronization>>,
411 real_time_updates:Arc<Mutex<RealTimeUpdates>>,
412 performance_stats:Arc<Mutex<PerformanceStats>>,
413 }
415
416impl WindAdvancedSync {
417 pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
419 Self {
420 runtime:runtime.clone(),
421 document_sync:Arc::new(Mutex::new(DocumentSynchronization {
422 synchronized_documents:HashMap::new(),
423 pending_changes:HashMap::new(),
424 last_sync_time:0,
425 sync_status:SyncStatus {
426 total_documents:0,
427 synced_documents:0,
428 conflicted_documents:0,
429 offline_documents:0,
430 last_sync_duration_ms:0,
431 },
432 })),
433 ui_state_sync:Arc::new(Mutex::new(UIStateSynchronization {
434 active_editor:None,
435 cursor_positions:HashMap::new(),
436 selection_ranges:HashMap::new(),
437 view_state:ViewState {
438 zoom_level:1.0,
439 sidebar_visible:true,
440 panel_visible:true,
441 status_bar_visible:true,
442 },
443 theme:"default".to_string(),
444 layout:LayoutState {
445 editor_groups:Vec::new(),
446 active_group:0,
447 grid_layout:GridLayout { rows:1, columns:1, cell_width:100, cell_height:100 },
448 },
449 })),
450 real_time_updates:Arc::new(Mutex::new(RealTimeUpdates {
451 updates:Vec::new(),
452 subscribers:HashMap::new(),
453 update_queue:Vec::new(),
454 last_broadcast:0,
455 })),
456 performance_stats:Arc::new(Mutex::new(PerformanceStats {
457 total_messages_sent:0,
458 total_messages_received:0,
459 average_processing_time_ms:0.0,
460 peak_message_rate:0,
461 error_count:0,
462 last_update:0,
463 connection_uptime:0,
464 })),
465 }
467 }
468
469 pub async fn initialize(&self) -> Result<(), String> {
471 info!("Initializing Wind Advanced Sync service");
472
473 self.start_sync_task().await;
475
476 self.start_performance_monitoring().await;
478
479 info!("Wind Advanced Sync service initialized successfully");
480 Ok(())
481 }
482
483 async fn start_sync_task(&self) {
485 let document_sync = self.document_sync.clone();
486 let runtime = self.runtime.clone();
487
488 tokio::spawn(async move {
489 let mut interval = interval(Duration::from_secs(5));
490
491 loop {
492 interval.tick().await;
493
494 if let Ok(mut sync) = document_sync.lock() {
496 let modified_docs:Vec<String> = sync
497 .synchronized_documents
498 .iter()
499 .filter(|(_, document)| document.sync_state == SyncState::Modified)
500 .map(|(doc_id, _)| doc_id.clone())
501 .collect();
502
503 if !modified_docs.is_empty() {
504 debug!("Synchronizing {} documents", modified_docs.len());
505
506 sync.last_sync_time =
508 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
509
510 sync.sync_status = Self::calculate_sync_status(&sync.synchronized_documents);
512
513 let _ = runtime
515 .Environment
516 .ApplicationHandle
517 .emit("mountain_sync_status_update", sync.sync_status.clone());
518 }
519 }
520 }
521 });
522 }
523
524 async fn start_performance_monitoring(&self) {
526 let performance_stats = self.performance_stats.clone();
527 let runtime = self.runtime.clone();
528
529 tokio::spawn(async move {
530 let mut interval = interval(Duration::from_secs(10));
531
532 loop {
533 interval.tick().await;
534
535 if let Ok(mut stats) = performance_stats.lock() {
536 stats.last_update =
537 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
538 stats.connection_uptime += 10;
539
540 let _ = runtime
542 .Environment
543 .ApplicationHandle
544 .emit("mountain_performance_update", stats.clone());
545 }
546 }
547 });
548 }
549
550 fn calculate_sync_status(documents:&HashMap<String, SynchronizedDocument>) -> SyncStatus {
552 let total = documents.len() as u32;
553 let synced = documents.values().filter(|d| d.sync_state == SyncState::Synced).count() as u32;
554 let conflicted = documents.values().filter(|d| d.sync_state == SyncState::Conflicted).count() as u32;
555 let offline = documents.values().filter(|d| d.sync_state == SyncState::Offline).count() as u32;
556
557 SyncStatus {
558 total_documents:total,
559 synced_documents:synced,
560 conflicted_documents:conflicted,
561 offline_documents:offline,
562 last_sync_duration_ms:0,
563 }
564 }
565
566 pub fn register_commands(_app:&mut tauri::App) -> Result<(), Box<dyn std::error::Error>> {
568 info!("Registering Wind Advanced Sync IPC commands");
569 Ok(())
570 }
571}
572
573impl WindAdvancedSync {
574 pub async fn start_synchronization(self: Arc<Self>) -> Result<(), String> {
576 info!("[WindAdvancedSync] Starting advanced synchronization");
577
578 let sync1 = self.clone();
580 tokio::spawn(async move {
581 sync1.synchronize_documents().await;
582 });
583
584 let sync2 = self.clone();
586 tokio::spawn(async move {
587 sync2.synchronize_ui_state().await;
588 });
589
590 let sync3 = self.clone();
592 tokio::spawn(async move {
593 sync3.broadcast_real_time_updates().await;
594 });
595
596 Ok(())
597 }
598
599 async fn synchronize_documents(&self) {
601 let mut interval = interval(Duration::from_secs(5));
602 let mut consecutive_failures = 0;
603 let max_consecutive_failures = 3;
604
605 loop {
606 interval.tick().await;
607
608 debug!("[WindAdvancedSync] Synchronizing documents");
609
610 let sync_start = std::time::Instant::now();
612 let mut success_count = 0;
613 let mut error_count = 0;
614
615 let changes = self.get_pending_changes().await;
617
618 for change in changes {
620 match self.apply_document_change(change).await {
621 Ok(_) => success_count += 1,
622 Err(e) => {
623 error_count += 1;
624 error!("[WindAdvancedSync] Failed to apply document change: {}", e);
625
626 consecutive_failures += 1;
628 if consecutive_failures >= max_consecutive_failures {
629 warn!("[WindAdvancedSync] Too many consecutive failures, slowing sync interval");
630 interval = tokio::time::interval(Duration::from_secs(30));
633 }
634 },
635 }
636 }
637
638 if success_count > 0 {
640 consecutive_failures = 0;
641 interval = tokio::time::interval(Duration::from_secs(5));
643 }
644
645 self.update_sync_status().await;
647
648 let sync_duration = sync_start.elapsed();
650 trace!(
651 "[WindAdvancedSync] Document sync completed: {} success, {} errors, {:.2}ms",
652 success_count,
653 error_count,
654 sync_duration.as_millis()
655 );
656 }
657 }
658
659 async fn synchronize_ui_state(&self) {
661 let mut interval = interval(Duration::from_secs(1));
662
663 loop {
664 interval.tick().await;
665
666 trace!("[WindAdvancedSync] Synchronizing UI state");
667
668 let ui_state = self.get_ui_state().await;
670
671 if let Err(e) = self.update_ui_state(ui_state).await {
673 error!("[WindAdvancedSync] Failed to update UI state: {}", e);
674 }
675 }
676 }
677
678 async fn broadcast_real_time_updates(&self) {
680 let mut interval = interval(Duration::from_millis(100));
681
682 loop {
683 interval.tick().await;
684
685 let updates = self.get_pending_updates().await;
686
687 if !updates.is_empty() {
688 if let Err(e) = self.broadcast_updates(updates).await {
690 error!("[WindAdvancedSync] Failed to broadcast updates: {}", e);
691 }
692 }
693 }
694 }
695
696 async fn get_pending_changes(&self) -> Vec<DocumentChange> {
698 let sync = self.document_sync.lock().unwrap();
699 sync.pending_changes.values().flatten().cloned().collect()
700 }
701
702 async fn apply_document_change(&self, change:DocumentChange) -> Result<(), String> {
704 debug!("[WindAdvancedSync] Applying document change: {}", change.change_id);
705
706 let change_start = std::time::Instant::now();
708
709 if let Err(conflict) = self.check_for_conflicts(&change).await {
711 warn!("[WindAdvancedSync] Conflict detected: {}", conflict);
712 return Err(format!("Conflict detected: {}", conflict));
713 }
714
715 match change.change_type {
717 ChangeType::Update => {
718 if let Some(_content) = &change.content {
720 }
729 },
730 ChangeType::Insert => {
731 if let Some(_content) = &change.content {
733 }
742 },
743 ChangeType::Delete => {
744 },
753 _ => {
754 warn!("[WindAdvancedSync] Unsupported change type: {:?}", change.change_type);
755 },
756 }
757
758 let mut sync = self.document_sync.lock().unwrap();
760 if let Some(changes) = sync.pending_changes.get_mut(&change.document_id) {
761 if let Some(change_idx) = changes.iter().position(|c| c.change_id == change.change_id) {
762 changes[change_idx].applied = true;
763 }
764 }
765
766 let change_duration = change_start.elapsed();
768 trace!(
769 "[WindAdvancedSync] Change applied successfully in {:.2}ms: {}",
770 change_duration.as_millis(),
771 change.change_id
772 );
773
774 Ok(())
775 }
776
777 async fn check_for_conflicts(&self, change:&DocumentChange) -> Result<(), String> {
779 let sync = self.document_sync.lock().unwrap();
780
781 if let Some(document) = sync.synchronized_documents.get(&change.document_id) {
783 let current_time = SystemTime::now()
784 .duration_since(SystemTime::UNIX_EPOCH)
785 .unwrap_or_default()
786 .as_secs();
787
788 if current_time - document.last_modified < 10 {
791 return Err(format!(
792 "Document {} was modified recently ({}s ago)",
793 document.document_id,
794 current_time - document.last_modified
795 ));
796 }
797
798 if matches!(document.sync_state, SyncState::Conflicted) {
800 return Err(format!("Document {} is in conflicted state", document.document_id));
801 }
802 }
803
804 Ok(())
805 }
806
807 async fn update_sync_status(&self) {
809 let mut sync = self.document_sync.lock().unwrap();
810
811 sync.sync_status.total_documents = sync.synchronized_documents.len() as u32;
812 sync.sync_status.synced_documents = sync
813 .synchronized_documents
814 .values()
815 .filter(|doc| matches!(doc.sync_state, SyncState::Synced))
816 .count() as u32;
817 sync.sync_status.conflicted_documents = sync
818 .synchronized_documents
819 .values()
820 .filter(|doc| matches!(doc.sync_state, SyncState::Conflicted))
821 .count() as u32;
822 sync.sync_status.offline_documents = sync
823 .synchronized_documents
824 .values()
825 .filter(|doc| matches!(doc.sync_state, SyncState::Offline))
826 .count() as u32;
827
828 sync.last_sync_time = SystemTime::now()
829 .duration_since(SystemTime::UNIX_EPOCH)
830 .unwrap_or_default()
831 .as_secs();
832 }
833
834 async fn get_ui_state(&self) -> UIStateSynchronization {
836 let sync = self.ui_state_sync.lock().unwrap();
837 sync.clone()
838 }
839
840 async fn update_ui_state(&self, ui_state:UIStateSynchronization) -> Result<(), String> {
842 let mut sync = self.ui_state_sync.lock().unwrap();
843 *sync = ui_state;
844
845 Ok(())
851 }
852
853 async fn get_pending_updates(&self) -> Vec<RealTimeUpdate> {
855 let mut updates = self.real_time_updates.lock().unwrap();
856 let pending = updates.update_queue.clone();
857 updates.update_queue.clear();
858 pending
859 }
860
861 async fn broadcast_updates(&self, updates:Vec<RealTimeUpdate>) -> Result<(), String> {
863 for update in updates {
864 let subscribers = {
866 let rt = self.real_time_updates.lock().unwrap();
867 rt.subscribers.get(&update.target).cloned()
868 };
869
870 if let Some(subscriber_list) = subscribers {
872 for subscriber in subscriber_list {
873 if let Err(e) = self
874 .runtime
875 .Environment
876 .ApplicationHandle
877 .emit(&format!("real-time-update-{}", subscriber), &update)
878 {
879 error!("[WindAdvancedSync] Failed to broadcast to {}: {}", subscriber, e);
880 }
881 }
882 }
883 }
884
885 Ok(())
886 }
887
888 pub async fn add_document(&self, document_id:String, file_path:String) -> Result<(), String> {
890 let mut sync = self.document_sync.lock().unwrap();
891
892 let document = SynchronizedDocument {
893 document_id:document_id.clone(),
894 file_path,
895 last_modified:SystemTime::now()
896 .duration_since(SystemTime::UNIX_EPOCH)
897 .unwrap_or_default()
898 .as_secs(),
899 content_hash:"".to_string(),
900 sync_state:SyncState::Synced,
901 version:1,
902 };
903
904 sync.synchronized_documents.insert(document_id, document);
905
906 debug!("[WindAdvancedSync] Document added for synchronization");
907 Ok(())
908 }
909
910 pub async fn subscribe_to_updates(&self, target:String, subscriber:String) -> Result<(), String> {
912 let mut updates = self.real_time_updates.lock().unwrap();
913
914 let target_clone = target.clone();
915 updates
916 .subscribers
917 .entry(target_clone.clone())
918 .or_insert_with(Vec::new)
919 .push(subscriber);
920
921 debug!("[WindAdvancedSync] Subscriber added for target: {}", target_clone);
922 Ok(())
923 }
924
925 pub async fn queue_update(&self, update:RealTimeUpdate) -> Result<(), String> {
927 let mut updates = self.real_time_updates.lock().unwrap();
928
929 updates.update_queue.push(update);
930 updates.last_broadcast = SystemTime::now()
931 .duration_since(SystemTime::UNIX_EPOCH)
932 .unwrap_or_default()
933 .as_secs();
934
935 trace!("[WindAdvancedSync] Update queued");
936 Ok(())
937 }
938
939 pub async fn get_sync_status(&self) -> SyncStatus {
941 let sync = self.document_sync.lock().unwrap();
942 sync.sync_status.clone()
943 }
944
945 pub async fn get_current_ui_state(&self) -> UIStateSynchronization { self.get_ui_state().await }
947
948 fn clone_sync(&self) -> WindAdvancedSync {
950 WindAdvancedSync {
951 runtime:self.runtime.clone(),
952 document_sync:self.document_sync.clone(),
953 ui_state_sync:self.ui_state_sync.clone(),
954 real_time_updates:self.real_time_updates.clone(),
955 performance_stats:self.performance_stats.clone(),
956 }
958 }
959}
960
961#[tauri::command]
963pub async fn mountain_add_document_for_sync(
964 app_handle:tauri::AppHandle,
965 document_id:String,
966 file_path:String,
967) -> Result<(), String> {
968 debug!("[WindAdvancedSync] Tauri command: add_document_for_sync");
969
970 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
971 sync.add_document(document_id, file_path).await
972 } else {
973 Err("WindAdvancedSync not found in application state".to_string())
974 }
975}
976
977#[tauri::command]
979pub async fn mountain_get_sync_status(app_handle:tauri::AppHandle) -> Result<SyncStatus, String> {
980 debug!("[WindAdvancedSync] Tauri command: get_sync_status");
981
982 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
983 Ok(sync.get_sync_status().await)
984 } else {
985 Err("WindAdvancedSync not found in application state".to_string())
986 }
987}
988
989#[tauri::command]
991pub async fn mountain_subscribe_to_updates(
992 app_handle:tauri::AppHandle,
993 target:String,
994 subscriber:String,
995) -> Result<(), String> {
996 debug!("[WindAdvancedSync] Tauri command: subscribe_to_updates");
997
998 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
999 sync.subscribe_to_updates(target, subscriber).await
1000 } else {
1001 Err("WindAdvancedSync not found in application state".to_string())
1002 }
1003}
1004
1005pub fn initialize_wind_advanced_sync(
1007 app_handle:&tauri::AppHandle,
1008 runtime:Arc<ApplicationRunTime>,
1009) -> Result<(), String> {
1010 info!("[WindAdvancedSync] Initializing Wind advanced synchronization");
1011
1012 let sync = Arc::new(WindAdvancedSync::new(runtime));
1013
1014 app_handle.manage(sync.clone());
1016
1017 let sync_clone = sync.clone();
1019 tokio::spawn(async move {
1020 if let Err(e) = sync_clone.start_synchronization().await {
1021 error!("[WindAdvancedSync] Failed to start synchronization: {}", e);
1022 }
1023 });
1024
1025 Ok(())
1026}