Skip to main content

Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs

1#![allow(non_snake_case)]
2
3//! `PerformanceDashboard` aggregator + 25-method impl. Holds
4//! the metric ring-buffer, trace store, alert ring, statistics
5//! cell, and the `is_running` lifecycle flag. Method bodies
6//! tightly couple with the sibling DTOs so the impl stays a
7//! single file (per the "tightly-coupled cluster" exception).
8
9use std::{
10	collections::{HashMap, VecDeque},
11	sync::Arc,
12	time::{Duration, SystemTime},
13};
14
15use tokio::{
16	sync::{Mutex as AsyncMutex, RwLock},
17	time::interval,
18};
19
20use crate::{
21	IPC::Enhanced::PerformanceDashboard::{
22		AlertSeverity::Enum as AlertSeverity,
23		DashboardConfig::Struct as DashboardConfig,
24		DashboardStatistics::Struct as DashboardStatistics,
25		LogLevel::Enum as LogLevel,
26		MetricType::Enum as MetricType,
27		PerformanceAlert::Struct as PerformanceAlert,
28		PerformanceMetric::Struct as PerformanceMetric,
29		TraceLog::Struct as TraceLog,
30		TraceSpan::Struct as TraceSpan,
31	},
32	dev_log,
33};
34
35pub struct Struct {
36	pub(super) config:DashboardConfig,
37	pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
38	pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
39	pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
40	pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
41	pub(super) is_running:Arc<AsyncMutex<bool>>,
42}
43
44impl Struct {
45	pub fn new(config:DashboardConfig) -> Self {
46		let config_clone = config.clone();
47		let dashboard = Self {
48			config,
49			metrics:Arc::new(RwLock::new(VecDeque::new())),
50			traces:Arc::new(RwLock::new(HashMap::new())),
51			alerts:Arc::new(RwLock::new(VecDeque::new())),
52			statistics:Arc::new(RwLock::new(DashboardStatistics {
53				total_metrics_collected:0,
54				total_traces_collected:0,
55				total_alerts_triggered:0,
56				average_processing_time_ms:0.0,
57				peak_processing_time_ms:0,
58				error_rate_percentage:0.0,
59				throughput_messages_per_second:0.0,
60				memory_usage_mb:0.0,
61				last_update:SystemTime::now()
62					.duration_since(SystemTime::UNIX_EPOCH)
63					.unwrap_or_default()
64					.as_secs(),
65			})),
66			is_running:Arc::new(AsyncMutex::new(false)),
67		};
68
69		dev_log!(
70			"ipc",
71			"[PerformanceDashboard] Created dashboard with {}ms update interval",
72			config_clone.update_interval_ms
73		);
74
75		dashboard
76	}
77
78	pub async fn start(&self) -> Result<(), String> {
79		{
80			let mut running = self.is_running.lock().await;
81			if *running {
82				return Ok(());
83			}
84			*running = true;
85		}
86
87		self.start_metrics_collection().await;
88		self.start_alert_monitoring().await;
89		self.start_data_cleanup().await;
90
91		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
92		Ok(())
93	}
94
95	pub async fn stop(&self) -> Result<(), String> {
96		{
97			let mut running = self.is_running.lock().await;
98			if !*running {
99				return Ok(());
100			}
101			*running = false;
102		}
103
104		{
105			let mut metrics = self.metrics.write().await;
106			metrics.clear();
107		}
108
109		{
110			let mut traces = self.traces.write().await;
111			traces.clear();
112		}
113
114		{
115			let mut alerts = self.alerts.write().await;
116			alerts.clear();
117		}
118
119		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
120		Ok(())
121	}
122
123	pub async fn record_metric(&self, metric:PerformanceMetric) {
124		let mut metrics = self.metrics.write().await;
125		metrics.push_back(metric.clone());
126
127		drop(metrics);
128
129		self.update_statistics().await;
130		self.check_alerts(&metric).await;
131
132		dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
133	}
134
135	pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
136		let trace_id = Self::generate_trace_id();
137		let span_id = Self::generate_span_id();
138
139		let span = TraceSpan {
140			trace_id:trace_id.clone(),
141			span_id:span_id.clone(),
142			parent_span_id:None,
143			operation_name,
144			start_time:SystemTime::now()
145				.duration_since(SystemTime::UNIX_EPOCH)
146				.unwrap_or_default()
147				.as_millis() as u64,
148			end_time:None,
149			duration_ms:None,
150			tags:HashMap::new(),
151			logs:Vec::new(),
152		};
153
154		{
155			let mut traces = self.traces.write().await;
156			traces.insert(span_id.clone(), span.clone());
157		}
158
159		{
160			let mut stats = self.statistics.write().await;
161			stats.total_traces_collected += 1;
162		}
163
164		span
165	}
166
167	pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
168		let mut traces = self.traces.write().await;
169
170		if let Some(span) = traces.get_mut(span_id) {
171			let end_time = SystemTime::now()
172				.duration_since(SystemTime::UNIX_EPOCH)
173				.unwrap_or_default()
174				.as_millis() as u64;
175
176			span.end_time = Some(end_time);
177			span.duration_ms = Some(end_time.saturating_sub(span.start_time));
178
179			dev_log!(
180				"ipc",
181				"[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
182				span.operation_name,
183				span.duration_ms.unwrap_or(0)
184			);
185
186			Ok(())
187		} else {
188			Err(format!("Trace span not found: {}", span_id))
189		}
190	}
191
192	pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
193		let mut traces = self.traces.write().await;
194
195		if let Some(span) = traces.get_mut(span_id) {
196			span.logs.push(log);
197			Ok(())
198		} else {
199			Err(format!("Trace span not found: {}", span_id))
200		}
201	}
202
203	async fn start_metrics_collection(&self) {
204		let dashboard = Arc::new(self.clone());
205
206		tokio::spawn(async move {
207			let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
208
209			while *dashboard.is_running.lock().await {
210				interval.tick().await;
211				dashboard.collect_system_metrics().await;
212				dashboard.update_statistics().await;
213			}
214		});
215	}
216
217	async fn start_alert_monitoring(&self) {
218		let dashboard = Arc::new(self.clone());
219
220		tokio::spawn(async move {
221			let mut interval = interval(Duration::from_secs(10));
222
223			while *dashboard.is_running.lock().await {
224				interval.tick().await;
225				dashboard.check_performance_alerts().await;
226			}
227		});
228	}
229
230	async fn start_data_cleanup(&self) {
231		let dashboard = Arc::new(self.clone());
232
233		tokio::spawn(async move {
234			let mut interval = interval(Duration::from_secs(3600));
235
236			while *dashboard.is_running.lock().await {
237				interval.tick().await;
238				dashboard.cleanup_old_data().await;
239			}
240		});
241	}
242
243	async fn collect_system_metrics(&self) {
244		if let Ok(memory_usage) = Self::get_memory_usage() {
245			let metric = PerformanceMetric {
246				metric_type:MetricType::MemoryUsage,
247				value:memory_usage,
248				timestamp:SystemTime::now()
249					.duration_since(SystemTime::UNIX_EPOCH)
250					.unwrap_or_default()
251					.as_millis() as u64,
252				channel:None,
253				tags:HashMap::new(),
254			};
255			self.record_metric(metric).await;
256		}
257
258		if let Ok(cpu_usage) = Self::get_cpu_usage() {
259			let metric = PerformanceMetric {
260				metric_type:MetricType::CpuUsage,
261				value:cpu_usage,
262				timestamp:SystemTime::now()
263					.duration_since(SystemTime::UNIX_EPOCH)
264					.unwrap_or_default()
265					.as_millis() as u64,
266				channel:None,
267				tags:HashMap::new(),
268			};
269			self.record_metric(metric).await;
270		}
271	}
272
273	async fn update_statistics(&self) {
274		let metrics = self.metrics.read().await;
275		let mut stats = self.statistics.write().await;
276
277		let processing_metrics:Vec<&PerformanceMetric> = metrics
278			.iter()
279			.filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
280			.collect();
281
282		if !processing_metrics.is_empty() {
283			let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
284			stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
285			stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
286		}
287
288		let error_metrics:Vec<&PerformanceMetric> = metrics
289			.iter()
290			.filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
291			.collect();
292
293		if !error_metrics.is_empty() {
294			let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
295			stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
296		}
297
298		let throughput_metrics:Vec<&PerformanceMetric> = metrics
299			.iter()
300			.filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
301			.collect();
302
303		if !throughput_metrics.is_empty() {
304			let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
305			stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
306		}
307
308		let memory_metrics:Vec<&PerformanceMetric> = metrics
309			.iter()
310			.filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
311			.collect();
312
313		if !memory_metrics.is_empty() {
314			let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
315			stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
316		}
317
318		stats.last_update = SystemTime::now()
319			.duration_since(SystemTime::UNIX_EPOCH)
320			.unwrap_or_default()
321			.as_secs();
322	}
323
324	async fn check_alerts(&self, metric:&PerformanceMetric) {
325		let threshold = match metric.metric_type {
326			MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
327			MetricType::ErrorRate => 5.0,
328			MetricType::MemoryUsage => 1024.0,
329			MetricType::CpuUsage => 90.0,
330			_ => return,
331		};
332
333		if metric.value > threshold {
334			let severity = match metric.value / threshold {
335				ratio if ratio > 5.0 => AlertSeverity::Critical,
336				ratio if ratio > 3.0 => AlertSeverity::High,
337				ratio if ratio > 2.0 => AlertSeverity::Medium,
338				_ => AlertSeverity::Low,
339			};
340
341			let alert = PerformanceAlert {
342				alert_id:Self::generate_alert_id(),
343				metric_type:metric.metric_type.clone(),
344				threshold,
345				current_value:metric.value,
346				timestamp:metric.timestamp,
347				channel:metric.channel.clone(),
348				severity,
349				message:format!(
350					"{} exceeded threshold: {} > {}",
351					Self::metric_type_name(&metric.metric_type),
352					metric.value,
353					threshold
354				),
355			};
356
357			{
358				let mut alerts = self.alerts.write().await;
359				alerts.push_back(alert.clone());
360			}
361
362			{
363				let mut stats = self.statistics.write().await;
364				stats.total_alerts_triggered += 1;
365			}
366
367			dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
368		}
369	}
370
371	async fn check_performance_alerts(&self) {
372		dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
373	}
374
375	async fn cleanup_old_data(&self) {
376		let retention_threshold = SystemTime::now()
377			.duration_since(SystemTime::UNIX_EPOCH)
378			.unwrap_or_default()
379			.as_secs()
380			- (self.config.metrics_retention_hours * 3600);
381
382		{
383			let mut metrics = self.metrics.write().await;
384			metrics.retain(|m| m.timestamp >= retention_threshold);
385		}
386
387		{
388			let mut traces = self.traces.write().await;
389			traces.retain(|_, span| span.start_time >= retention_threshold);
390
391			if traces.len() > self.config.max_traces_stored {
392				let excess = traces.len() - self.config.max_traces_stored;
393				let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
394
395				for key in keys_to_remove {
396					traces.remove(&key);
397				}
398			}
399		}
400
401		{
402			let mut alerts = self.alerts.write().await;
403			alerts.retain(|a| a.timestamp >= retention_threshold);
404		}
405
406		dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
407	}
408
409	fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
410
411	fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
412
413	fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
414
415	fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
416
417	fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
418
419	fn metric_type_name(metric_type:&MetricType) -> &'static str {
420		match metric_type {
421			MetricType::MessageProcessingTime => "Message Processing Time",
422			MetricType::ConnectionLatency => "Connection Latency",
423			MetricType::MemoryUsage => "Memory Usage",
424			MetricType::CpuUsage => "CPU Usage",
425			MetricType::NetworkThroughput => "Network Throughput",
426			MetricType::ErrorRate => "Error Rate",
427			MetricType::QueueSize => "Queue Size",
428		}
429	}
430
431	pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
432
433	pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
434		let metrics = self.metrics.read().await;
435		metrics.iter().rev().take(limit).cloned().collect()
436	}
437
438	pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
439		let alerts = self.alerts.read().await;
440		alerts.iter().rev().cloned().collect()
441	}
442
443	pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
444		let traces = self.traces.read().await;
445		traces.values().find(|span| span.trace_id == trace_id).cloned()
446	}
447
448	pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
449
450	pub fn high_frequency_dashboard() -> Self {
451		Self::new(DashboardConfig {
452			update_interval_ms:1000,
453			metrics_retention_hours:1,
454			alert_threshold_ms:500,
455			trace_sampling_rate:1.0,
456			max_traces_stored:5000,
457		})
458	}
459
460	pub fn create_metric(
461		metric_type:MetricType,
462		value:f64,
463		channel:Option<String>,
464		tags:HashMap<String, String>,
465	) -> PerformanceMetric {
466		PerformanceMetric {
467			metric_type,
468			value,
469			timestamp:SystemTime::now()
470				.duration_since(SystemTime::UNIX_EPOCH)
471				.unwrap_or_default()
472				.as_millis() as u64,
473			channel,
474			tags,
475		}
476	}
477
478	pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
479		TraceLog {
480			timestamp:SystemTime::now()
481				.duration_since(SystemTime::UNIX_EPOCH)
482				.unwrap_or_default()
483				.as_millis() as u64,
484			message,
485			level,
486			fields,
487		}
488	}
489
490	pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
491		let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
492		let error_score = 100.0 * (1.0 - error_rate / 100.0);
493		let throughput_score = throughput / 1000.0;
494
495		(time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
496			.max(0.0)
497			.min(100.0)
498	}
499
500	pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
501		match metric_type {
502			MetricType::MessageProcessingTime => format!("{:.2}ms", value),
503			MetricType::ConnectionLatency => format!("{:.2}ms", value),
504			MetricType::MemoryUsage => format!("{:.2}MB", value),
505			MetricType::CpuUsage => format!("{:.2}%", value),
506			MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
507			MetricType::ErrorRate => format!("{:.2}%", value),
508			MetricType::QueueSize => format!("{:.0}", value),
509		}
510	}
511}
512
513impl Clone for Struct {
514	fn clone(&self) -> Self {
515		Self {
516			config:self.config.clone(),
517			metrics:self.metrics.clone(),
518			traces:self.traces.clone(),
519			alerts:self.alerts.clone(),
520			statistics:self.statistics.clone(),
521			is_running:Arc::new(AsyncMutex::new(false)),
522		}
523	}
524}