1use std::{
277 collections::HashMap,
278 sync::{Arc, Mutex},
279 time::{Duration, SystemTime},
280};
281
282use serde::{Deserialize, Serialize};
283use tokio::time::interval;
284use tauri::{Emitter, Manager};
285
286use crate::{IPC::AdvancedFeatures::PerformanceStats, RunTime::ApplicationRunTime::ApplicationRunTime, dev_log};
287
288#[derive(Clone, Serialize, Deserialize, Debug)]
295pub struct SyncStatus {
296 pub total_documents:u32,
297 pub synced_documents:u32,
298 pub conflicted_documents:u32,
299 pub offline_documents:u32,
300 pub last_sync_duration_ms:u64,
301}
302
303#[derive(Clone, Copy, PartialEq, Debug)]
305pub enum SyncState {
306 Modified,
307 Synced,
308 Conflicted,
309 Offline,
310}
311
312#[derive(Clone, Copy, Debug)]
314pub enum ChangeType {
315 Update,
316 Insert,
317 Delete,
318 Move,
319 Other,
320}
321
322#[derive(Clone, Debug)]
324pub struct SynchronizedDocument {
325 pub document_id:String,
326 pub file_path:String,
327 pub last_modified:u64,
328 pub content_hash:String,
329 pub sync_state:SyncState,
330 pub version:u32,
331}
332
333#[derive(Clone, Debug)]
335pub struct DocumentChange {
336 pub change_id:String,
337 pub document_id:String,
338 pub change_type:ChangeType,
339 pub content:Option<String>,
340 pub applied:bool,
341}
342
343pub struct DocumentSynchronization {
345 pub synchronized_documents:HashMap<String, SynchronizedDocument>,
346 pub pending_changes:HashMap<String, Vec<DocumentChange>>,
347 pub last_sync_time:u64,
348 pub sync_status:SyncStatus,
349}
350
351#[derive(Clone, Serialize, Deserialize, Debug)]
353pub struct RealTimeUpdate {
354 pub target:String,
355 pub data:String,
356}
357
358pub struct RealTimeUpdateManager {
360 pub Updates:Vec<RealTimeUpdate>,
361 pub Subscribers:HashMap<String, Vec<String>>,
362 pub UpdateQueue:Vec<RealTimeUpdate>,
363 pub LastBroadcast:u64,
364}
365
366#[derive(Clone, Debug)]
368pub struct ViewState {
369 pub zoom_level:f32,
370 pub sidebar_visible:bool,
371 pub panel_visible:bool,
372 pub status_bar_visible:bool,
373}
374
375#[derive(Clone, Debug)]
377pub struct GridLayout {
378 pub rows:u32,
379 pub columns:u32,
380 pub cell_width:u32,
381 pub cell_height:u32,
382}
383
384#[derive(Clone, Debug)]
386pub struct LayoutState {
387 pub editor_groups:Vec<String>,
388 pub active_group:u32,
389 pub grid_layout:GridLayout,
390}
391
392#[derive(Clone, Debug)]
394pub struct UIStateSynchronization {
395 pub active_editor:Option<String>,
396 pub cursor_positions:HashMap<String, (u32, u32)>,
397 pub selection_ranges:HashMap<String, (u32, u32)>,
398 pub view_state:ViewState,
399 pub theme:String,
400 pub layout:LayoutState,
401}
402
403#[derive(Clone)]
405pub struct WindAdvancedSync {
406 runtime:Arc<ApplicationRunTime>,
407 document_sync:Arc<Mutex<DocumentSynchronization>>,
408 ui_state_sync:Arc<Mutex<UIStateSynchronization>>,
409 real_time_updates:Arc<Mutex<RealTimeUpdateManager>>,
410 performance_stats:Arc<Mutex<PerformanceStats>>,
411 }
413
414impl WindAdvancedSync {
415 pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
417 Self {
418 runtime:runtime.clone(),
419 document_sync:Arc::new(Mutex::new(DocumentSynchronization {
420 synchronized_documents:HashMap::new(),
421 pending_changes:HashMap::new(),
422 last_sync_time:0,
423 sync_status:SyncStatus {
424 total_documents:0,
425 synced_documents:0,
426 conflicted_documents:0,
427 offline_documents:0,
428 last_sync_duration_ms:0,
429 },
430 })),
431 ui_state_sync:Arc::new(Mutex::new(UIStateSynchronization {
432 active_editor:None,
433 cursor_positions:HashMap::new(),
434 selection_ranges:HashMap::new(),
435 view_state:ViewState {
436 zoom_level:1.0,
437 sidebar_visible:true,
438 panel_visible:true,
439 status_bar_visible:true,
440 },
441 theme:"default".to_string(),
442 layout:LayoutState {
443 editor_groups:Vec::new(),
444 active_group:0,
445 grid_layout:GridLayout { rows:1, columns:1, cell_width:100, cell_height:100 },
446 },
447 })),
448 real_time_updates:Arc::new(Mutex::new(RealTimeUpdateManager {
449 Updates:Vec::new(),
450 Subscribers:HashMap::new(),
451 UpdateQueue:Vec::new(),
452 LastBroadcast:0,
453 })),
454 performance_stats:Arc::new(Mutex::new(PerformanceStats {
455 total_messages_sent:0,
456 total_messages_received:0,
457 average_processing_time_ms:0.0,
458 peak_message_rate:0,
459 error_count:0,
460 last_update:0,
461 connection_uptime:0,
462 })),
463 }
465 }
466
467 pub async fn initialize(&self) -> Result<(), String> {
469 dev_log!("ipc", "Initializing Wind Advanced Sync service");
470
471 self.start_sync_task().await;
473
474 self.start_performance_monitoring().await;
476
477 dev_log!("ipc", "Wind Advanced Sync service initialized successfully");
478 Ok(())
479 }
480
481 async fn start_sync_task(&self) {
483 let document_sync = self.document_sync.clone();
484 let runtime = self.runtime.clone();
485
486 tokio::spawn(async move {
487 let mut interval = interval(Duration::from_secs(5));
488
489 loop {
490 interval.tick().await;
491
492 if let Ok(mut sync) = document_sync.lock() {
494 let modified_docs:Vec<String> = sync
495 .synchronized_documents
496 .iter()
497 .filter(|(_, document)| document.sync_state == SyncState::Modified)
498 .map(|(doc_id, _)| doc_id.clone())
499 .collect();
500
501 if !modified_docs.is_empty() {
502 dev_log!("ipc", "Synchronizing {} documents", modified_docs.len());
503
504 sync.last_sync_time =
506 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
507
508 sync.sync_status = Self::calculate_sync_status(&sync.synchronized_documents);
510
511 if std::env::var("LAND_SYNC_STATUS_EMIT").is_ok() {
518 let _ = runtime
519 .Environment
520 .ApplicationHandle
521 .emit("mountain_sync_status_update", sync.sync_status.clone());
522 }
523 }
524 }
525 }
526 });
527 }
528
529 async fn start_performance_monitoring(&self) {
531 let performance_stats = self.performance_stats.clone();
532 let runtime = self.runtime.clone();
533
534 tokio::spawn(async move {
535 let mut interval = interval(Duration::from_secs(10));
536
537 loop {
538 interval.tick().await;
539
540 if let Ok(mut stats) = performance_stats.lock() {
541 stats.last_update =
542 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
543 stats.connection_uptime += 10;
544
545 if std::env::var("LAND_PERF_EMIT").is_ok() {
550 let _ = runtime
551 .Environment
552 .ApplicationHandle
553 .emit("mountain_performance_update", stats.clone());
554 }
555 }
556 }
557 });
558 }
559
560 fn calculate_sync_status(documents:&HashMap<String, SynchronizedDocument>) -> SyncStatus {
562 let total = documents.len() as u32;
563 let synced = documents.values().filter(|d| d.sync_state == SyncState::Synced).count() as u32;
564 let conflicted = documents.values().filter(|d| d.sync_state == SyncState::Conflicted).count() as u32;
565 let offline = documents.values().filter(|d| d.sync_state == SyncState::Offline).count() as u32;
566
567 SyncStatus {
568 total_documents:total,
569 synced_documents:synced,
570 conflicted_documents:conflicted,
571 offline_documents:offline,
572 last_sync_duration_ms:0,
573 }
574 }
575
576 pub fn register_commands(_app:&mut tauri::App) -> Result<(), Box<dyn std::error::Error>> {
578 dev_log!("ipc", "Registering Wind Advanced Sync IPC commands");
579 Ok(())
580 }
581}
582
583impl WindAdvancedSync {
584 pub async fn start_synchronization(self: Arc<Self>) -> Result<(), String> {
586 dev_log!("lifecycle", "Starting advanced synchronization");
587
588 let sync1 = self.clone();
590 tokio::spawn(async move {
591 sync1.synchronize_documents().await;
592 });
593
594 let sync2 = self.clone();
596 tokio::spawn(async move {
597 sync2.synchronize_ui_state().await;
598 });
599
600 let sync3 = self.clone();
602 tokio::spawn(async move {
603 sync3.broadcast_real_time_updates().await;
604 });
605
606 Ok(())
607 }
608
609 async fn synchronize_documents(&self) {
611 let mut interval = interval(Duration::from_secs(5));
612 let mut consecutive_failures = 0;
613 let max_consecutive_failures = 3;
614
615 loop {
616 interval.tick().await;
617
618 dev_log!("lifecycle", "Synchronizing documents");
619
620 let sync_start = std::time::Instant::now();
622 let mut success_count = 0;
623 let mut error_count = 0;
624
625 let changes = self.get_pending_changes().await;
627
628 for change in changes {
630 match self.apply_document_change(change).await {
631 Ok(_) => success_count += 1,
632 Err(e) => {
633 error_count += 1;
634 dev_log!("ipc", "error: [WindAdvancedSync] Failed to apply document change: {}", e);
635
636 consecutive_failures += 1;
638 if consecutive_failures >= max_consecutive_failures {
639 dev_log!("lifecycle", "Too many consecutive failures, slowing sync interval");
640 interval = tokio::time::interval(Duration::from_secs(30));
643 }
644 },
645 }
646 }
647
648 if success_count > 0 {
650 consecutive_failures = 0;
651 interval = tokio::time::interval(Duration::from_secs(5));
653 }
654
655 self.update_sync_status().await;
657
658 let sync_duration = sync_start.elapsed();
660 dev_log!(
661 "ipc",
662 "[WindAdvancedSync] Document sync completed: {} success, {} errors, {:.2}ms",
663 success_count,
664 error_count,
665 sync_duration.as_millis()
666 );
667 }
668 }
669
670 async fn synchronize_ui_state(&self) {
672 let mut interval = interval(Duration::from_secs(1));
673
674 loop {
675 interval.tick().await;
676
677 dev_log!("ipc", "[WindAdvancedSync] Synchronizing UI state");
678
679 let ui_state = self.get_ui_state().await;
681
682 if let Err(e) = self.update_ui_state(ui_state).await {
684 dev_log!("ipc", "error: [WindAdvancedSync] Failed to update UI state: {}", e);
685 }
686 }
687 }
688
689 async fn broadcast_real_time_updates(&self) {
691 let mut interval = interval(Duration::from_millis(100));
692
693 loop {
694 interval.tick().await;
695
696 {
702 let rt = self.real_time_updates.lock().unwrap();
703 if rt.Subscribers.is_empty() {
704 continue;
705 }
706 }
707
708 let updates = self.get_pending_updates().await;
709
710 if !updates.is_empty() {
711 if let Err(e) = self.broadcast_updates(updates).await {
713 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast updates: {}", e);
714 }
715 }
716 }
717 }
718
719 async fn get_pending_changes(&self) -> Vec<DocumentChange> {
721 let sync = self.document_sync.lock().unwrap();
722 sync.pending_changes.values().flatten().cloned().collect()
723 }
724
725 async fn apply_document_change(&self, change:DocumentChange) -> Result<(), String> {
727 dev_log!("lifecycle", "Applying document change: {}", change.change_id);
728
729 let change_start = std::time::Instant::now();
731
732 if let Err(conflict) = self.check_for_conflicts(&change).await {
734 dev_log!("lifecycle", "Conflict detected: {}", conflict);
735 return Err(format!("Conflict detected: {}", conflict));
736 }
737
738 match change.change_type {
740 ChangeType::Update => {
741 if let Some(_content) = &change.content {
743 }
752 },
753 ChangeType::Insert => {
754 if let Some(_content) = &change.content {
756 }
765 },
766 ChangeType::Delete => {
767 },
776 _ => {
777 dev_log!("lifecycle", "Unsupported change type: {:?}", change.change_type);
778 },
779 }
780
781 let mut sync = self.document_sync.lock().unwrap();
783 if let Some(changes) = sync.pending_changes.get_mut(&change.document_id) {
784 if let Some(change_idx) = changes.iter().position(|c| c.change_id == change.change_id) {
785 changes[change_idx].applied = true;
786 }
787 }
788
789 let change_duration = change_start.elapsed();
791 dev_log!(
792 "ipc",
793 "[WindAdvancedSync] Change applied successfully in {:.2}ms: {}",
794 change_duration.as_millis(),
795 change.change_id
796 );
797
798 Ok(())
799 }
800
801 async fn check_for_conflicts(&self, change:&DocumentChange) -> Result<(), String> {
803 let sync = self.document_sync.lock().unwrap();
804
805 if let Some(document) = sync.synchronized_documents.get(&change.document_id) {
807 let current_time = SystemTime::now()
808 .duration_since(SystemTime::UNIX_EPOCH)
809 .unwrap_or_default()
810 .as_secs();
811
812 if current_time - document.last_modified < 10 {
815 return Err(format!(
816 "Document {} was modified recently ({}s ago)",
817 document.document_id,
818 current_time - document.last_modified
819 ));
820 }
821
822 if matches!(document.sync_state, SyncState::Conflicted) {
824 return Err(format!("Document {} is in conflicted state", document.document_id));
825 }
826 }
827
828 Ok(())
829 }
830
831 async fn update_sync_status(&self) {
833 let mut sync = self.document_sync.lock().unwrap();
834
835 sync.sync_status.total_documents = sync.synchronized_documents.len() as u32;
836 sync.sync_status.synced_documents = sync
837 .synchronized_documents
838 .values()
839 .filter(|doc| matches!(doc.sync_state, SyncState::Synced))
840 .count() as u32;
841 sync.sync_status.conflicted_documents = sync
842 .synchronized_documents
843 .values()
844 .filter(|doc| matches!(doc.sync_state, SyncState::Conflicted))
845 .count() as u32;
846 sync.sync_status.offline_documents = sync
847 .synchronized_documents
848 .values()
849 .filter(|doc| matches!(doc.sync_state, SyncState::Offline))
850 .count() as u32;
851
852 sync.last_sync_time = SystemTime::now()
853 .duration_since(SystemTime::UNIX_EPOCH)
854 .unwrap_or_default()
855 .as_secs();
856 }
857
858 async fn get_ui_state(&self) -> UIStateSynchronization {
860 let sync = self.ui_state_sync.lock().unwrap();
861 sync.clone()
862 }
863
864 async fn update_ui_state(&self, ui_state:UIStateSynchronization) -> Result<(), String> {
866 let mut sync = self.ui_state_sync.lock().unwrap();
867 *sync = ui_state;
868
869 Ok(())
875 }
876
877 async fn get_pending_updates(&self) -> Vec<RealTimeUpdate> {
879 let mut updates = self.real_time_updates.lock().unwrap();
880 let pending = updates.UpdateQueue.clone();
881 updates.UpdateQueue.clear();
882 pending
883 }
884
885 async fn broadcast_updates(&self, updates:Vec<RealTimeUpdate>) -> Result<(), String> {
887 for update in updates {
888 let subscribers = {
890 let rt = self.real_time_updates.lock().unwrap();
891 rt.Subscribers.get(&update.target).cloned()
892 };
893
894 if let Some(subscriber_list) = subscribers {
896 for subscriber in subscriber_list {
897 if let Err(e) = self
898 .runtime
899 .Environment
900 .ApplicationHandle
901 .emit(&format!("real-time-update-{}", subscriber), &update)
902 {
903 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast to {}: {}", subscriber, e);
904 }
905 }
906 }
907 }
908
909 Ok(())
910 }
911
912 pub async fn add_document(&self, document_id:String, file_path:String) -> Result<(), String> {
914 let mut sync = self.document_sync.lock().unwrap();
915
916 let document = SynchronizedDocument {
917 document_id:document_id.clone(),
918 file_path,
919 last_modified:SystemTime::now()
920 .duration_since(SystemTime::UNIX_EPOCH)
921 .unwrap_or_default()
922 .as_secs(),
923 content_hash:"".to_string(),
924 sync_state:SyncState::Synced,
925 version:1,
926 };
927
928 sync.synchronized_documents.insert(document_id, document);
929
930 dev_log!("lifecycle", "Document added for synchronization");
931 Ok(())
932 }
933
934 pub async fn subscribe_to_updates(&self, target:String, subscriber:String) -> Result<(), String> {
936 let mut updates = self.real_time_updates.lock().unwrap();
937
938 let target_clone = target.clone();
939 updates
940 .Subscribers
941 .entry(target_clone.clone())
942 .or_insert_with(Vec::new)
943 .push(subscriber);
944
945 dev_log!("lifecycle", "Subscriber added for target: {}", target_clone);
946 Ok(())
947 }
948
949 pub async fn queue_update(&self, update:RealTimeUpdate) -> Result<(), String> {
951 let mut updates = self.real_time_updates.lock().unwrap();
952
953 updates.UpdateQueue.push(update);
954 updates.LastBroadcast = SystemTime::now()
955 .duration_since(SystemTime::UNIX_EPOCH)
956 .unwrap_or_default()
957 .as_secs();
958
959 dev_log!("ipc", "[WindAdvancedSync] Update queued");
960 Ok(())
961 }
962
963 pub async fn get_sync_status(&self) -> SyncStatus {
965 let sync = self.document_sync.lock().unwrap();
966 sync.sync_status.clone()
967 }
968
969 pub async fn get_current_ui_state(&self) -> UIStateSynchronization { self.get_ui_state().await }
971
972 #[allow(dead_code)]
974 fn clone_sync(&self) -> WindAdvancedSync {
975 WindAdvancedSync {
976 runtime:self.runtime.clone(),
977 document_sync:self.document_sync.clone(),
978 ui_state_sync:self.ui_state_sync.clone(),
979 real_time_updates:self.real_time_updates.clone(),
980 performance_stats:self.performance_stats.clone(),
981 }
983 }
984}
985
986#[tauri::command]
988pub async fn mountain_add_document_for_sync(
989 app_handle:tauri::AppHandle,
990 document_id:String,
991 file_path:String,
992) -> Result<(), String> {
993 dev_log!("lifecycle", "Tauri command: add_document_for_sync");
994
995 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
996 sync.add_document(document_id, file_path).await
997 } else {
998 Err("WindAdvancedSync not found in application state".to_string())
999 }
1000}
1001
1002#[tauri::command]
1004pub async fn mountain_get_sync_status(app_handle:tauri::AppHandle) -> Result<SyncStatus, String> {
1005 dev_log!("lifecycle", "Tauri command: get_sync_status");
1006
1007 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1008 Ok(sync.get_sync_status().await)
1009 } else {
1010 Err("WindAdvancedSync not found in application state".to_string())
1011 }
1012}
1013
1014#[tauri::command]
1016pub async fn mountain_subscribe_to_updates(
1017 app_handle:tauri::AppHandle,
1018 target:String,
1019 subscriber:String,
1020) -> Result<(), String> {
1021 dev_log!("lifecycle", "Tauri command: subscribe_to_updates");
1022
1023 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1024 sync.subscribe_to_updates(target, subscriber).await
1025 } else {
1026 Err("WindAdvancedSync not found in application state".to_string())
1027 }
1028}
1029
1030pub fn initialize_wind_advanced_sync(
1032 app_handle:&tauri::AppHandle,
1033 runtime:Arc<ApplicationRunTime>,
1034) -> Result<(), String> {
1035 dev_log!("lifecycle", "Initializing Wind advanced synchronization");
1036
1037 let sync = Arc::new(WindAdvancedSync::new(runtime));
1038
1039 app_handle.manage(sync.clone());
1041
1042 let sync_clone = sync.clone();
1044 tokio::spawn(async move {
1045 if let Err(e) = sync_clone.start_synchronization().await {
1046 dev_log!("ipc", "error: [WindAdvancedSync] Failed to start synchronization: {}", e);
1047 }
1048 });
1049
1050 Ok(())
1051}