Skip to main content

Mountain/IPC/AdvancedFeatures/
Features.rs

1#![allow(non_snake_case)]
2
3//! `AdvancedFeatures` aggregator - holds the runtime handle,
4//! cumulative `PerformanceStats::Struct`, the realtime
5//! collaboration-session map, and the
6//! `MessageCache::Struct`. Spawns three monitor tasks
7//! (`monitor_performance`, `cleanup_cache`,
8//! `monitor_collaboration_sessions`) on `start_monitoring`.
9//! The 12-method impl is kept in one file - tightly-coupled
10//! cluster.
11
12use std::{
13	collections::HashMap,
14	sync::{Arc, Mutex},
15	time::{Duration, SystemTime},
16};
17
18use tauri::Emitter;
19use tokio::time::interval;
20
21use crate::{
22	IPC::AdvancedFeatures::{
23		CachedMessage::Struct as CachedMessage,
24		CollaborationPermissions::Struct as CollaborationPermissions,
25		CollaborationSession::Struct as CollaborationSession,
26		MessageCache::Struct as MessageCache,
27		PerformanceStats::Struct as PerformanceStats,
28	},
29	RunTime::ApplicationRunTime::ApplicationRunTime,
30	dev_log,
31};
32
33#[derive(Clone)]
34pub struct Struct {
35	pub(super) runtime:Arc<ApplicationRunTime>,
36	pub(super) performance_stats:Arc<Mutex<PerformanceStats>>,
37	pub(super) collaboration_sessions:Arc<Mutex<HashMap<String, CollaborationSession>>>,
38	pub(super) message_cache:Arc<Mutex<MessageCache>>,
39}
40
41impl Struct {
42	pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
43		dev_log!("lifecycle", "Initializing advanced IPC features");
44
45		Self {
46			runtime,
47			performance_stats:Arc::new(Mutex::new(PerformanceStats {
48				total_messages_sent:0,
49				total_messages_received:0,
50				average_processing_time_ms:0.0,
51				peak_message_rate:0,
52				error_count:0,
53				last_update:SystemTime::now()
54					.duration_since(SystemTime::UNIX_EPOCH)
55					.unwrap_or_default()
56					.as_secs(),
57				connection_uptime:0,
58			})),
59			collaboration_sessions:Arc::new(Mutex::new(HashMap::new())),
60			message_cache:Arc::new(Mutex::new(MessageCache {
61				cached_messages:HashMap::new(),
62				cache_hits:0,
63				cache_misses:0,
64				cache_size:0,
65			})),
66		}
67	}
68
69	pub async fn start_monitoring(&self) -> Result<(), String> {
70		dev_log!("lifecycle", "Starting advanced monitoring");
71
72		let features1 = self.clone_features();
73		let features2 = self.clone_features();
74		let features3 = self.clone_features();
75
76		tokio::spawn(async move {
77			features1.monitor_performance().await;
78		});
79		tokio::spawn(async move {
80			features2.cleanup_cache().await;
81		});
82		tokio::spawn(async move {
83			features3.monitor_collaboration_sessions().await;
84		});
85
86		Ok(())
87	}
88
89	async fn monitor_performance(&self) {
90		let mut interval = interval(Duration::from_secs(10));
91
92		loop {
93			interval.tick().await;
94
95			let stats = self.calculate_performance_stats().await;
96
97			if let Err(e) = self.runtime.Environment.ApplicationHandle.emit("ipc-performance-stats", &stats) {
98				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit performance stats: {}", e);
99			}
100
101			dev_log!("lifecycle", "Performance stats updated");
102		}
103	}
104
105	async fn calculate_performance_stats(&self) -> PerformanceStats {
106		let mut stats = self.performance_stats.lock().unwrap();
107
108		stats.connection_uptime = SystemTime::now()
109			.duration_since(SystemTime::UNIX_EPOCH)
110			.unwrap_or_default()
111			.as_secs()
112			- stats.last_update;
113
114		stats.last_update = SystemTime::now()
115			.duration_since(SystemTime::UNIX_EPOCH)
116			.unwrap_or_default()
117			.as_secs();
118
119		stats.clone()
120	}
121
122	async fn cleanup_cache(&self) {
123		let mut interval = interval(Duration::from_secs(60));
124
125		loop {
126			interval.tick().await;
127
128			let current_time = SystemTime::now()
129				.duration_since(SystemTime::UNIX_EPOCH)
130				.unwrap_or_default()
131				.as_secs();
132
133			let mut cache = self.message_cache.lock().unwrap();
134
135			cache
136				.cached_messages
137				.retain(|_, cached_message| current_time < cached_message.timestamp + cached_message.ttl);
138
139			cache.cache_size = cache.cached_messages.len();
140
141			dev_log!("lifecycle", "Cache cleaned, {} entries remaining", cache.cache_size);
142		}
143	}
144
145	async fn monitor_collaboration_sessions(&self) {
146		let mut interval = interval(Duration::from_secs(30));
147
148		loop {
149			interval.tick().await;
150
151			let current_time = SystemTime::now()
152				.duration_since(SystemTime::UNIX_EPOCH)
153				.unwrap_or_default()
154				.as_secs();
155
156			let mut sessions = self.collaboration_sessions.lock().unwrap();
157
158			sessions.retain(|_, session| current_time - session.last_activity < 300);
159
160			let active_sessions:Vec<CollaborationSession> = sessions.values().cloned().collect();
161
162			if let Err(e) = self
163				.runtime
164				.Environment
165				.ApplicationHandle
166				.emit("collaboration-sessions-update", &active_sessions)
167			{
168				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit collaboration sessions: {}", e);
169			}
170
171			dev_log!("lifecycle", "Collaboration sessions monitored, {} active", sessions.len());
172		}
173	}
174
175	pub async fn cache_message(&self, message_id:String, data:serde_json::Value, ttl:u64) -> Result<(), String> {
176		let mut cache = self
177			.message_cache
178			.lock()
179			.map_err(|e| format!("Failed to access message cache: {}", e))?;
180
181		let cached_message = CachedMessage {
182			data,
183			timestamp:SystemTime::now()
184				.duration_since(SystemTime::UNIX_EPOCH)
185				.unwrap_or_default()
186				.as_secs(),
187			ttl,
188		};
189
190		cache.cached_messages.insert(message_id.clone(), cached_message);
191		cache.cache_size = cache.cached_messages.len();
192
193		dev_log!("lifecycle", "Message cached: {}, TTL: {}s", message_id, ttl);
194		Ok(())
195	}
196
197	pub async fn get_cached_message(&self, message_id:&str) -> Option<serde_json::Value> {
198		let mut cache = self.message_cache.lock().unwrap();
199
200		let result = cache
201			.cached_messages
202			.get(message_id)
203			.map(|cached_message| cached_message.data.clone());
204
205		if result.is_some() {
206			cache.cache_hits += 1;
207		} else {
208			cache.cache_misses += 1;
209		}
210
211		result
212	}
213
214	pub async fn create_collaboration_session(
215		&self,
216		session_id:String,
217		permissions:CollaborationPermissions,
218	) -> Result<(), String> {
219		let mut sessions = self
220			.collaboration_sessions
221			.lock()
222			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
223
224		let session = CollaborationSession {
225			session_id:session_id.clone(),
226			participants:Vec::new(),
227			active_documents:Vec::new(),
228			last_activity:SystemTime::now()
229				.duration_since(SystemTime::UNIX_EPOCH)
230				.unwrap_or_default()
231				.as_secs(),
232			permissions,
233		};
234
235		sessions.insert(session_id, session);
236
237		dev_log!("lifecycle", "Collaboration session created");
238		Ok(())
239	}
240
241	pub async fn add_participant(&self, session_id:&str, participant:String) -> Result<(), String> {
242		let mut sessions = self
243			.collaboration_sessions
244			.lock()
245			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
246
247		if let Some(session) = sessions.get_mut(session_id) {
248			if !session.participants.contains(&participant) {
249				session.participants.push(participant);
250				session.last_activity = SystemTime::now()
251					.duration_since(SystemTime::UNIX_EPOCH)
252					.unwrap_or_default()
253					.as_secs();
254
255				dev_log!("lifecycle", "Participant added to session: {}", session_id);
256			}
257		} else {
258			return Err(format!("Session not found: {}", session_id));
259		}
260
261		Ok(())
262	}
263
264	pub async fn record_message_statistics(&self, sent:bool, processing_time_ms:u64) {
265		let mut stats = self.performance_stats.lock().unwrap();
266
267		if sent {
268			stats.total_messages_sent += 1;
269		} else {
270			stats.total_messages_received += 1;
271		}
272
273		let total_messages = stats.total_messages_sent + stats.total_messages_received;
274		stats.average_processing_time_ms = (stats.average_processing_time_ms * (total_messages - 1) as f64
275			+ processing_time_ms as f64)
276			/ total_messages as f64;
277	}
278
279	pub async fn record_error(&self) {
280		let mut stats = self.performance_stats.lock().unwrap();
281		stats.error_count += 1;
282	}
283
284	pub async fn get_performance_stats(&self) -> Result<PerformanceStats, String> {
285		Ok(self.calculate_performance_stats().await)
286	}
287
288	pub async fn get_cache_stats(&self) -> Result<MessageCache, String> {
289		let cache = self.message_cache.lock().unwrap();
290		Ok(cache.clone())
291	}
292
293	pub async fn get_collaboration_sessions(&self) -> Vec<CollaborationSession> {
294		let sessions = self.collaboration_sessions.lock().unwrap();
295		sessions.values().cloned().collect()
296	}
297
298	pub(super) fn clone_features(&self) -> Self {
299		Self {
300			runtime:self.runtime.clone(),
301			performance_stats:self.performance_stats.clone(),
302			collaboration_sessions:self.collaboration_sessions.clone(),
303			message_cache:self.message_cache.clone(),
304		}
305	}
306}