1use std::{
277 collections::HashMap,
278 sync::{Arc, Mutex},
279 time::{Duration, SystemTime},
280};
281
282use serde::{Deserialize, Serialize};
283use tokio::time::interval;
284use tauri::{Emitter, Manager};
285
286use crate::{
287 IPC::AdvancedFeatures::PerformanceStats::Struct as PerformanceStats,
288 RunTime::ApplicationRunTime::ApplicationRunTime,
289 dev_log,
290};
291
292#[derive(Clone, Serialize, Deserialize, Debug)]
299pub struct SyncStatus {
300 pub total_documents:u32,
301 pub synced_documents:u32,
302 pub conflicted_documents:u32,
303 pub offline_documents:u32,
304 pub last_sync_duration_ms:u64,
305}
306
307#[derive(Clone, Copy, PartialEq, Debug)]
309pub enum SyncState {
310 Modified,
311 Synced,
312 Conflicted,
313 Offline,
314}
315
316#[derive(Clone, Copy, Debug)]
318pub enum ChangeType {
319 Update,
320 Insert,
321 Delete,
322 Move,
323 Other,
324}
325
326#[derive(Clone, Debug)]
328pub struct SynchronizedDocument {
329 pub document_id:String,
330 pub file_path:String,
331 pub last_modified:u64,
332 pub content_hash:String,
333 pub sync_state:SyncState,
334 pub version:u32,
335}
336
337#[derive(Clone, Debug)]
339pub struct DocumentChange {
340 pub change_id:String,
341 pub document_id:String,
342 pub change_type:ChangeType,
343 pub content:Option<String>,
344 pub applied:bool,
345}
346
347pub struct DocumentSynchronization {
349 pub synchronized_documents:HashMap<String, SynchronizedDocument>,
350 pub pending_changes:HashMap<String, Vec<DocumentChange>>,
351 pub last_sync_time:u64,
352 pub sync_status:SyncStatus,
353}
354
355#[derive(Clone, Serialize, Deserialize, Debug)]
357pub struct RealTimeUpdate {
358 pub target:String,
359 pub data:String,
360}
361
362pub struct RealTimeUpdateManager {
364 pub Updates:Vec<RealTimeUpdate>,
365 pub Subscribers:HashMap<String, Vec<String>>,
366 pub UpdateQueue:Vec<RealTimeUpdate>,
367 pub LastBroadcast:u64,
368}
369
370#[derive(Clone, Debug)]
372pub struct ViewState {
373 pub zoom_level:f32,
374 pub sidebar_visible:bool,
375 pub panel_visible:bool,
376 pub status_bar_visible:bool,
377}
378
379#[derive(Clone, Debug)]
381pub struct GridLayout {
382 pub rows:u32,
383 pub columns:u32,
384 pub cell_width:u32,
385 pub cell_height:u32,
386}
387
388#[derive(Clone, Debug)]
390pub struct LayoutState {
391 pub editor_groups:Vec<String>,
392 pub active_group:u32,
393 pub grid_layout:GridLayout,
394}
395
396#[derive(Clone, Debug)]
398pub struct UIStateSynchronization {
399 pub active_editor:Option<String>,
400 pub cursor_positions:HashMap<String, (u32, u32)>,
401 pub selection_ranges:HashMap<String, (u32, u32)>,
402 pub view_state:ViewState,
403 pub theme:String,
404 pub layout:LayoutState,
405}
406
407#[derive(Clone)]
409pub struct WindAdvancedSync {
410 runtime:Arc<ApplicationRunTime>,
411 document_sync:Arc<Mutex<DocumentSynchronization>>,
412 ui_state_sync:Arc<Mutex<UIStateSynchronization>>,
413 real_time_updates:Arc<Mutex<RealTimeUpdateManager>>,
414 performance_stats:Arc<Mutex<PerformanceStats>>,
415 }
417
418impl WindAdvancedSync {
419 pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
421 Self {
422 runtime:runtime.clone(),
423 document_sync:Arc::new(Mutex::new(DocumentSynchronization {
424 synchronized_documents:HashMap::new(),
425 pending_changes:HashMap::new(),
426 last_sync_time:0,
427 sync_status:SyncStatus {
428 total_documents:0,
429 synced_documents:0,
430 conflicted_documents:0,
431 offline_documents:0,
432 last_sync_duration_ms:0,
433 },
434 })),
435 ui_state_sync:Arc::new(Mutex::new(UIStateSynchronization {
436 active_editor:None,
437 cursor_positions:HashMap::new(),
438 selection_ranges:HashMap::new(),
439 view_state:ViewState {
440 zoom_level:1.0,
441 sidebar_visible:true,
442 panel_visible:true,
443 status_bar_visible:true,
444 },
445 theme:"default".to_string(),
446 layout:LayoutState {
447 editor_groups:Vec::new(),
448 active_group:0,
449 grid_layout:GridLayout { rows:1, columns:1, cell_width:100, cell_height:100 },
450 },
451 })),
452 real_time_updates:Arc::new(Mutex::new(RealTimeUpdateManager {
453 Updates:Vec::new(),
454 Subscribers:HashMap::new(),
455 UpdateQueue:Vec::new(),
456 LastBroadcast:0,
457 })),
458 performance_stats:Arc::new(Mutex::new(PerformanceStats {
459 total_messages_sent:0,
460 total_messages_received:0,
461 average_processing_time_ms:0.0,
462 peak_message_rate:0,
463 error_count:0,
464 last_update:0,
465 connection_uptime:0,
466 })),
467 }
469 }
470
471 pub async fn initialize(&self) -> Result<(), String> {
473 dev_log!("ipc", "Initializing Wind Advanced Sync service");
474
475 self.start_sync_task().await;
477
478 self.start_performance_monitoring().await;
480
481 dev_log!("ipc", "Wind Advanced Sync service initialized successfully");
482 Ok(())
483 }
484
485 async fn start_sync_task(&self) {
487 let document_sync = self.document_sync.clone();
488 let runtime = self.runtime.clone();
489
490 tokio::spawn(async move {
491 let mut interval = interval(Duration::from_secs(5));
492
493 loop {
494 interval.tick().await;
495
496 if let Ok(mut sync) = document_sync.lock() {
498 let modified_docs:Vec<String> = sync
499 .synchronized_documents
500 .iter()
501 .filter(|(_, document)| document.sync_state == SyncState::Modified)
502 .map(|(doc_id, _)| doc_id.clone())
503 .collect();
504
505 if !modified_docs.is_empty() {
506 dev_log!("ipc", "Synchronizing {} documents", modified_docs.len());
507
508 sync.last_sync_time =
510 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
511
512 sync.sync_status = Self::calculate_sync_status(&sync.synchronized_documents);
514
515 if std::env::var("LAND_SYNC_STATUS_EMIT").is_ok() {
522 let _ = runtime
523 .Environment
524 .ApplicationHandle
525 .emit("mountain_sync_status_update", sync.sync_status.clone());
526 }
527 }
528 }
529 }
530 });
531 }
532
533 async fn start_performance_monitoring(&self) {
535 let performance_stats = self.performance_stats.clone();
536 let runtime = self.runtime.clone();
537
538 tokio::spawn(async move {
539 let mut interval = interval(Duration::from_secs(10));
540
541 loop {
542 interval.tick().await;
543
544 if let Ok(mut stats) = performance_stats.lock() {
545 stats.last_update =
546 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
547 stats.connection_uptime += 10;
548
549 if std::env::var("LAND_PERF_EMIT").is_ok() {
554 let _ = runtime
555 .Environment
556 .ApplicationHandle
557 .emit("mountain_performance_update", stats.clone());
558 }
559 }
560 }
561 });
562 }
563
564 fn calculate_sync_status(documents:&HashMap<String, SynchronizedDocument>) -> SyncStatus {
566 let total = documents.len() as u32;
567 let synced = documents.values().filter(|d| d.sync_state == SyncState::Synced).count() as u32;
568 let conflicted = documents.values().filter(|d| d.sync_state == SyncState::Conflicted).count() as u32;
569 let offline = documents.values().filter(|d| d.sync_state == SyncState::Offline).count() as u32;
570
571 SyncStatus {
572 total_documents:total,
573 synced_documents:synced,
574 conflicted_documents:conflicted,
575 offline_documents:offline,
576 last_sync_duration_ms:0,
577 }
578 }
579
580 pub fn register_commands(_app:&mut tauri::App) -> Result<(), Box<dyn std::error::Error>> {
582 dev_log!("ipc", "Registering Wind Advanced Sync IPC commands");
583 Ok(())
584 }
585}
586
587impl WindAdvancedSync {
588 pub async fn start_synchronization(self: Arc<Self>) -> Result<(), String> {
590 dev_log!("lifecycle", "Starting advanced synchronization");
591
592 let sync1 = self.clone();
594 tokio::spawn(async move {
595 sync1.synchronize_documents().await;
596 });
597
598 let sync2 = self.clone();
600 tokio::spawn(async move {
601 sync2.synchronize_ui_state().await;
602 });
603
604 let sync3 = self.clone();
606 tokio::spawn(async move {
607 sync3.broadcast_real_time_updates().await;
608 });
609
610 Ok(())
611 }
612
613 async fn synchronize_documents(&self) {
615 let mut interval = interval(Duration::from_secs(5));
616 let mut consecutive_failures = 0;
617 let max_consecutive_failures = 3;
618
619 loop {
620 interval.tick().await;
621
622 dev_log!("lifecycle", "Synchronizing documents");
623
624 let sync_start = std::time::Instant::now();
626 let mut success_count = 0;
627 let mut error_count = 0;
628
629 let changes = self.get_pending_changes().await;
631
632 for change in changes {
634 match self.apply_document_change(change).await {
635 Ok(_) => success_count += 1,
636 Err(e) => {
637 error_count += 1;
638 dev_log!("ipc", "error: [WindAdvancedSync] Failed to apply document change: {}", e);
639
640 consecutive_failures += 1;
642 if consecutive_failures >= max_consecutive_failures {
643 dev_log!("lifecycle", "Too many consecutive failures, slowing sync interval");
644 interval = tokio::time::interval(Duration::from_secs(30));
647 }
648 },
649 }
650 }
651
652 if success_count > 0 {
654 consecutive_failures = 0;
655 interval = tokio::time::interval(Duration::from_secs(5));
657 }
658
659 self.update_sync_status().await;
661
662 let sync_duration = sync_start.elapsed();
664 dev_log!(
665 "ipc",
666 "[WindAdvancedSync] Document sync completed: {} success, {} errors, {:.2}ms",
667 success_count,
668 error_count,
669 sync_duration.as_millis()
670 );
671 }
672 }
673
674 async fn synchronize_ui_state(&self) {
676 let mut interval = interval(Duration::from_secs(1));
677
678 loop {
679 interval.tick().await;
680
681 dev_log!("ipc", "[WindAdvancedSync] Synchronizing UI state");
682
683 let ui_state = self.get_ui_state().await;
685
686 if let Err(e) = self.update_ui_state(ui_state).await {
688 dev_log!("ipc", "error: [WindAdvancedSync] Failed to update UI state: {}", e);
689 }
690 }
691 }
692
693 async fn broadcast_real_time_updates(&self) {
695 let mut interval = interval(Duration::from_millis(100));
696
697 loop {
698 interval.tick().await;
699
700 {
706 let rt = self.real_time_updates.lock().unwrap();
707 if rt.Subscribers.is_empty() {
708 continue;
709 }
710 }
711
712 let updates = self.get_pending_updates().await;
713
714 if !updates.is_empty() {
715 if let Err(e) = self.broadcast_updates(updates).await {
717 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast updates: {}", e);
718 }
719 }
720 }
721 }
722
723 async fn get_pending_changes(&self) -> Vec<DocumentChange> {
725 let sync = self.document_sync.lock().unwrap();
726 sync.pending_changes.values().flatten().cloned().collect()
727 }
728
729 async fn apply_document_change(&self, change:DocumentChange) -> Result<(), String> {
731 dev_log!("lifecycle", "Applying document change: {}", change.change_id);
732
733 let change_start = std::time::Instant::now();
735
736 if let Err(conflict) = self.check_for_conflicts(&change).await {
738 dev_log!("lifecycle", "Conflict detected: {}", conflict);
739 return Err(format!("Conflict detected: {}", conflict));
740 }
741
742 match change.change_type {
744 ChangeType::Update => {
745 if let Some(_content) = &change.content {
747 }
756 },
757 ChangeType::Insert => {
758 if let Some(_content) = &change.content {
760 }
769 },
770 ChangeType::Delete => {
771 },
780 _ => {
781 dev_log!("lifecycle", "Unsupported change type: {:?}", change.change_type);
782 },
783 }
784
785 let mut sync = self.document_sync.lock().unwrap();
787 if let Some(changes) = sync.pending_changes.get_mut(&change.document_id) {
788 if let Some(change_idx) = changes.iter().position(|c| c.change_id == change.change_id) {
789 changes[change_idx].applied = true;
790 }
791 }
792
793 let change_duration = change_start.elapsed();
795 dev_log!(
796 "ipc",
797 "[WindAdvancedSync] Change applied successfully in {:.2}ms: {}",
798 change_duration.as_millis(),
799 change.change_id
800 );
801
802 Ok(())
803 }
804
805 async fn check_for_conflicts(&self, change:&DocumentChange) -> Result<(), String> {
807 let sync = self.document_sync.lock().unwrap();
808
809 if let Some(document) = sync.synchronized_documents.get(&change.document_id) {
811 let current_time = SystemTime::now()
812 .duration_since(SystemTime::UNIX_EPOCH)
813 .unwrap_or_default()
814 .as_secs();
815
816 if current_time - document.last_modified < 10 {
819 return Err(format!(
820 "Document {} was modified recently ({}s ago)",
821 document.document_id,
822 current_time - document.last_modified
823 ));
824 }
825
826 if matches!(document.sync_state, SyncState::Conflicted) {
828 return Err(format!("Document {} is in conflicted state", document.document_id));
829 }
830 }
831
832 Ok(())
833 }
834
835 async fn update_sync_status(&self) {
837 let mut sync = self.document_sync.lock().unwrap();
838
839 sync.sync_status.total_documents = sync.synchronized_documents.len() as u32;
840 sync.sync_status.synced_documents = sync
841 .synchronized_documents
842 .values()
843 .filter(|doc| matches!(doc.sync_state, SyncState::Synced))
844 .count() as u32;
845 sync.sync_status.conflicted_documents = sync
846 .synchronized_documents
847 .values()
848 .filter(|doc| matches!(doc.sync_state, SyncState::Conflicted))
849 .count() as u32;
850 sync.sync_status.offline_documents = sync
851 .synchronized_documents
852 .values()
853 .filter(|doc| matches!(doc.sync_state, SyncState::Offline))
854 .count() as u32;
855
856 sync.last_sync_time = SystemTime::now()
857 .duration_since(SystemTime::UNIX_EPOCH)
858 .unwrap_or_default()
859 .as_secs();
860 }
861
862 async fn get_ui_state(&self) -> UIStateSynchronization {
864 let sync = self.ui_state_sync.lock().unwrap();
865 sync.clone()
866 }
867
868 async fn update_ui_state(&self, ui_state:UIStateSynchronization) -> Result<(), String> {
870 let mut sync = self.ui_state_sync.lock().unwrap();
871 *sync = ui_state;
872
873 Ok(())
879 }
880
881 async fn get_pending_updates(&self) -> Vec<RealTimeUpdate> {
883 let mut updates = self.real_time_updates.lock().unwrap();
884 let pending = updates.UpdateQueue.clone();
885 updates.UpdateQueue.clear();
886 pending
887 }
888
889 async fn broadcast_updates(&self, updates:Vec<RealTimeUpdate>) -> Result<(), String> {
891 for update in updates {
892 let subscribers = {
894 let rt = self.real_time_updates.lock().unwrap();
895 rt.Subscribers.get(&update.target).cloned()
896 };
897
898 if let Some(subscriber_list) = subscribers {
900 for subscriber in subscriber_list {
901 if let Err(e) = self
902 .runtime
903 .Environment
904 .ApplicationHandle
905 .emit(&format!("real-time-update-{}", subscriber), &update)
906 {
907 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast to {}: {}", subscriber, e);
908 }
909 }
910 }
911 }
912
913 Ok(())
914 }
915
916 pub async fn add_document(&self, document_id:String, file_path:String) -> Result<(), String> {
918 let mut sync = self.document_sync.lock().unwrap();
919
920 let document = SynchronizedDocument {
921 document_id:document_id.clone(),
922 file_path,
923 last_modified:SystemTime::now()
924 .duration_since(SystemTime::UNIX_EPOCH)
925 .unwrap_or_default()
926 .as_secs(),
927 content_hash:"".to_string(),
928 sync_state:SyncState::Synced,
929 version:1,
930 };
931
932 sync.synchronized_documents.insert(document_id, document);
933
934 dev_log!("lifecycle", "Document added for synchronization");
935 Ok(())
936 }
937
938 pub async fn subscribe_to_updates(&self, target:String, subscriber:String) -> Result<(), String> {
940 let mut updates = self.real_time_updates.lock().unwrap();
941
942 let target_clone = target.clone();
943 updates
944 .Subscribers
945 .entry(target_clone.clone())
946 .or_insert_with(Vec::new)
947 .push(subscriber);
948
949 dev_log!("lifecycle", "Subscriber added for target: {}", target_clone);
950 Ok(())
951 }
952
953 pub async fn queue_update(&self, update:RealTimeUpdate) -> Result<(), String> {
955 let mut updates = self.real_time_updates.lock().unwrap();
956
957 updates.UpdateQueue.push(update);
958 updates.LastBroadcast = SystemTime::now()
959 .duration_since(SystemTime::UNIX_EPOCH)
960 .unwrap_or_default()
961 .as_secs();
962
963 dev_log!("ipc", "[WindAdvancedSync] Update queued");
964 Ok(())
965 }
966
967 pub async fn get_sync_status(&self) -> SyncStatus {
969 let sync = self.document_sync.lock().unwrap();
970 sync.sync_status.clone()
971 }
972
973 pub async fn get_current_ui_state(&self) -> UIStateSynchronization { self.get_ui_state().await }
975
976 #[allow(dead_code)]
978 fn clone_sync(&self) -> WindAdvancedSync {
979 WindAdvancedSync {
980 runtime:self.runtime.clone(),
981 document_sync:self.document_sync.clone(),
982 ui_state_sync:self.ui_state_sync.clone(),
983 real_time_updates:self.real_time_updates.clone(),
984 performance_stats:self.performance_stats.clone(),
985 }
987 }
988}
989
990#[tauri::command]
992pub async fn mountain_add_document_for_sync(
993 app_handle:tauri::AppHandle,
994 document_id:String,
995 file_path:String,
996) -> Result<(), String> {
997 dev_log!("lifecycle", "Tauri command: add_document_for_sync");
998
999 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1000 sync.add_document(document_id, file_path).await
1001 } else {
1002 Err("WindAdvancedSync not found in application state".to_string())
1003 }
1004}
1005
1006#[tauri::command]
1008pub async fn mountain_get_sync_status(app_handle:tauri::AppHandle) -> Result<SyncStatus, String> {
1009 dev_log!("lifecycle", "Tauri command: get_sync_status");
1010
1011 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1012 Ok(sync.get_sync_status().await)
1013 } else {
1014 Err("WindAdvancedSync not found in application state".to_string())
1015 }
1016}
1017
1018#[tauri::command]
1020pub async fn mountain_subscribe_to_updates(
1021 app_handle:tauri::AppHandle,
1022 target:String,
1023 subscriber:String,
1024) -> Result<(), String> {
1025 dev_log!("lifecycle", "Tauri command: subscribe_to_updates");
1026
1027 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1028 sync.subscribe_to_updates(target, subscriber).await
1029 } else {
1030 Err("WindAdvancedSync not found in application state".to_string())
1031 }
1032}
1033
1034pub fn initialize_wind_advanced_sync(
1036 app_handle:&tauri::AppHandle,
1037 runtime:Arc<ApplicationRunTime>,
1038) -> Result<(), String> {
1039 dev_log!("lifecycle", "Initializing Wind advanced synchronization");
1040
1041 let sync = Arc::new(WindAdvancedSync::new(runtime));
1042
1043 app_handle.manage(sync.clone());
1045
1046 let sync_clone = sync.clone();
1048 tokio::spawn(async move {
1049 if let Err(e) = sync_clone.start_synchronization().await {
1050 dev_log!("ipc", "error: [WindAdvancedSync] Failed to start synchronization: {}", e);
1051 }
1052 });
1053
1054 Ok(())
1055}