Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs1#![allow(non_snake_case)]
2
3use std::{
10 collections::{HashMap, VecDeque},
11 sync::Arc,
12 time::{Duration, SystemTime},
13};
14
15use tokio::{
16 sync::{Mutex as AsyncMutex, RwLock},
17 time::interval,
18};
19
20use crate::{
21 IPC::Enhanced::PerformanceDashboard::{
22 AlertSeverity::Enum as AlertSeverity,
23 DashboardConfig::Struct as DashboardConfig,
24 DashboardStatistics::Struct as DashboardStatistics,
25 LogLevel::Enum as LogLevel,
26 MetricType::Enum as MetricType,
27 PerformanceAlert::Struct as PerformanceAlert,
28 PerformanceMetric::Struct as PerformanceMetric,
29 TraceLog::Struct as TraceLog,
30 TraceSpan::Struct as TraceSpan,
31 },
32 dev_log,
33};
34
35pub struct Struct {
36 pub(super) config:DashboardConfig,
37 pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
38 pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
39 pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
40 pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
41 pub(super) is_running:Arc<AsyncMutex<bool>>,
42}
43
44impl Struct {
45 pub fn new(config:DashboardConfig) -> Self {
46 let config_clone = config.clone();
47 let dashboard = Self {
48 config,
49 metrics:Arc::new(RwLock::new(VecDeque::new())),
50 traces:Arc::new(RwLock::new(HashMap::new())),
51 alerts:Arc::new(RwLock::new(VecDeque::new())),
52 statistics:Arc::new(RwLock::new(DashboardStatistics {
53 total_metrics_collected:0,
54 total_traces_collected:0,
55 total_alerts_triggered:0,
56 average_processing_time_ms:0.0,
57 peak_processing_time_ms:0,
58 error_rate_percentage:0.0,
59 throughput_messages_per_second:0.0,
60 memory_usage_mb:0.0,
61 last_update:SystemTime::now()
62 .duration_since(SystemTime::UNIX_EPOCH)
63 .unwrap_or_default()
64 .as_secs(),
65 })),
66 is_running:Arc::new(AsyncMutex::new(false)),
67 };
68
69 dev_log!(
70 "ipc",
71 "[PerformanceDashboard] Created dashboard with {}ms update interval",
72 config_clone.update_interval_ms
73 );
74
75 dashboard
76 }
77
78 pub async fn start(&self) -> Result<(), String> {
79 {
80 let mut running = self.is_running.lock().await;
81 if *running {
82 return Ok(());
83 }
84 *running = true;
85 }
86
87 self.start_metrics_collection().await;
88 self.start_alert_monitoring().await;
89 self.start_data_cleanup().await;
90
91 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
92 Ok(())
93 }
94
95 pub async fn stop(&self) -> Result<(), String> {
96 {
97 let mut running = self.is_running.lock().await;
98 if !*running {
99 return Ok(());
100 }
101 *running = false;
102 }
103
104 {
105 let mut metrics = self.metrics.write().await;
106 metrics.clear();
107 }
108
109 {
110 let mut traces = self.traces.write().await;
111 traces.clear();
112 }
113
114 {
115 let mut alerts = self.alerts.write().await;
116 alerts.clear();
117 }
118
119 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
120 Ok(())
121 }
122
123 pub async fn record_metric(&self, metric:PerformanceMetric) {
124 let mut metrics = self.metrics.write().await;
125 metrics.push_back(metric.clone());
126
127 drop(metrics);
128
129 self.update_statistics().await;
130 self.check_alerts(&metric).await;
131
132 dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
133 }
134
135 pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
136 let trace_id = Self::generate_trace_id();
137 let span_id = Self::generate_span_id();
138
139 let span = TraceSpan {
140 trace_id:trace_id.clone(),
141 span_id:span_id.clone(),
142 parent_span_id:None,
143 operation_name,
144 start_time:SystemTime::now()
145 .duration_since(SystemTime::UNIX_EPOCH)
146 .unwrap_or_default()
147 .as_millis() as u64,
148 end_time:None,
149 duration_ms:None,
150 tags:HashMap::new(),
151 logs:Vec::new(),
152 };
153
154 {
155 let mut traces = self.traces.write().await;
156 traces.insert(span_id.clone(), span.clone());
157 }
158
159 {
160 let mut stats = self.statistics.write().await;
161 stats.total_traces_collected += 1;
162 }
163
164 span
165 }
166
167 pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
168 let mut traces = self.traces.write().await;
169
170 if let Some(span) = traces.get_mut(span_id) {
171 let end_time = SystemTime::now()
172 .duration_since(SystemTime::UNIX_EPOCH)
173 .unwrap_or_default()
174 .as_millis() as u64;
175
176 span.end_time = Some(end_time);
177 span.duration_ms = Some(end_time.saturating_sub(span.start_time));
178
179 dev_log!(
180 "ipc",
181 "[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
182 span.operation_name,
183 span.duration_ms.unwrap_or(0)
184 );
185
186 Ok(())
187 } else {
188 Err(format!("Trace span not found: {}", span_id))
189 }
190 }
191
192 pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
193 let mut traces = self.traces.write().await;
194
195 if let Some(span) = traces.get_mut(span_id) {
196 span.logs.push(log);
197 Ok(())
198 } else {
199 Err(format!("Trace span not found: {}", span_id))
200 }
201 }
202
203 async fn start_metrics_collection(&self) {
204 let dashboard = Arc::new(self.clone());
205
206 tokio::spawn(async move {
207 let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
208
209 while *dashboard.is_running.lock().await {
210 interval.tick().await;
211 dashboard.collect_system_metrics().await;
212 dashboard.update_statistics().await;
213 }
214 });
215 }
216
217 async fn start_alert_monitoring(&self) {
218 let dashboard = Arc::new(self.clone());
219
220 tokio::spawn(async move {
221 let mut interval = interval(Duration::from_secs(10));
222
223 while *dashboard.is_running.lock().await {
224 interval.tick().await;
225 dashboard.check_performance_alerts().await;
226 }
227 });
228 }
229
230 async fn start_data_cleanup(&self) {
231 let dashboard = Arc::new(self.clone());
232
233 tokio::spawn(async move {
234 let mut interval = interval(Duration::from_secs(3600));
235
236 while *dashboard.is_running.lock().await {
237 interval.tick().await;
238 dashboard.cleanup_old_data().await;
239 }
240 });
241 }
242
243 async fn collect_system_metrics(&self) {
244 if let Ok(memory_usage) = Self::get_memory_usage() {
245 let metric = PerformanceMetric {
246 metric_type:MetricType::MemoryUsage,
247 value:memory_usage,
248 timestamp:SystemTime::now()
249 .duration_since(SystemTime::UNIX_EPOCH)
250 .unwrap_or_default()
251 .as_millis() as u64,
252 channel:None,
253 tags:HashMap::new(),
254 };
255 self.record_metric(metric).await;
256 }
257
258 if let Ok(cpu_usage) = Self::get_cpu_usage() {
259 let metric = PerformanceMetric {
260 metric_type:MetricType::CpuUsage,
261 value:cpu_usage,
262 timestamp:SystemTime::now()
263 .duration_since(SystemTime::UNIX_EPOCH)
264 .unwrap_or_default()
265 .as_millis() as u64,
266 channel:None,
267 tags:HashMap::new(),
268 };
269 self.record_metric(metric).await;
270 }
271 }
272
273 async fn update_statistics(&self) {
274 let metrics = self.metrics.read().await;
275 let mut stats = self.statistics.write().await;
276
277 let processing_metrics:Vec<&PerformanceMetric> = metrics
278 .iter()
279 .filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
280 .collect();
281
282 if !processing_metrics.is_empty() {
283 let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
284 stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
285 stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
286 }
287
288 let error_metrics:Vec<&PerformanceMetric> = metrics
289 .iter()
290 .filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
291 .collect();
292
293 if !error_metrics.is_empty() {
294 let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
295 stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
296 }
297
298 let throughput_metrics:Vec<&PerformanceMetric> = metrics
299 .iter()
300 .filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
301 .collect();
302
303 if !throughput_metrics.is_empty() {
304 let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
305 stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
306 }
307
308 let memory_metrics:Vec<&PerformanceMetric> = metrics
309 .iter()
310 .filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
311 .collect();
312
313 if !memory_metrics.is_empty() {
314 let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
315 stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
316 }
317
318 stats.last_update = SystemTime::now()
319 .duration_since(SystemTime::UNIX_EPOCH)
320 .unwrap_or_default()
321 .as_secs();
322 }
323
324 async fn check_alerts(&self, metric:&PerformanceMetric) {
325 let threshold = match metric.metric_type {
326 MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
327 MetricType::ErrorRate => 5.0,
328 MetricType::MemoryUsage => 1024.0,
329 MetricType::CpuUsage => 90.0,
330 _ => return,
331 };
332
333 if metric.value > threshold {
334 let severity = match metric.value / threshold {
335 ratio if ratio > 5.0 => AlertSeverity::Critical,
336 ratio if ratio > 3.0 => AlertSeverity::High,
337 ratio if ratio > 2.0 => AlertSeverity::Medium,
338 _ => AlertSeverity::Low,
339 };
340
341 let alert = PerformanceAlert {
342 alert_id:Self::generate_alert_id(),
343 metric_type:metric.metric_type.clone(),
344 threshold,
345 current_value:metric.value,
346 timestamp:metric.timestamp,
347 channel:metric.channel.clone(),
348 severity,
349 message:format!(
350 "{} exceeded threshold: {} > {}",
351 Self::metric_type_name(&metric.metric_type),
352 metric.value,
353 threshold
354 ),
355 };
356
357 {
358 let mut alerts = self.alerts.write().await;
359 alerts.push_back(alert.clone());
360 }
361
362 {
363 let mut stats = self.statistics.write().await;
364 stats.total_alerts_triggered += 1;
365 }
366
367 dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
368 }
369 }
370
371 async fn check_performance_alerts(&self) {
372 dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
373 }
374
375 async fn cleanup_old_data(&self) {
376 let retention_threshold = SystemTime::now()
377 .duration_since(SystemTime::UNIX_EPOCH)
378 .unwrap_or_default()
379 .as_secs()
380 - (self.config.metrics_retention_hours * 3600);
381
382 {
383 let mut metrics = self.metrics.write().await;
384 metrics.retain(|m| m.timestamp >= retention_threshold);
385 }
386
387 {
388 let mut traces = self.traces.write().await;
389 traces.retain(|_, span| span.start_time >= retention_threshold);
390
391 if traces.len() > self.config.max_traces_stored {
392 let excess = traces.len() - self.config.max_traces_stored;
393 let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
394
395 for key in keys_to_remove {
396 traces.remove(&key);
397 }
398 }
399 }
400
401 {
402 let mut alerts = self.alerts.write().await;
403 alerts.retain(|a| a.timestamp >= retention_threshold);
404 }
405
406 dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
407 }
408
409 fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
410
411 fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
412
413 fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
414
415 fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
416
417 fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
418
419 fn metric_type_name(metric_type:&MetricType) -> &'static str {
420 match metric_type {
421 MetricType::MessageProcessingTime => "Message Processing Time",
422 MetricType::ConnectionLatency => "Connection Latency",
423 MetricType::MemoryUsage => "Memory Usage",
424 MetricType::CpuUsage => "CPU Usage",
425 MetricType::NetworkThroughput => "Network Throughput",
426 MetricType::ErrorRate => "Error Rate",
427 MetricType::QueueSize => "Queue Size",
428 }
429 }
430
431 pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
432
433 pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
434 let metrics = self.metrics.read().await;
435 metrics.iter().rev().take(limit).cloned().collect()
436 }
437
438 pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
439 let alerts = self.alerts.read().await;
440 alerts.iter().rev().cloned().collect()
441 }
442
443 pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
444 let traces = self.traces.read().await;
445 traces.values().find(|span| span.trace_id == trace_id).cloned()
446 }
447
448 pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
449
450 pub fn high_frequency_dashboard() -> Self {
451 Self::new(DashboardConfig {
452 update_interval_ms:1000,
453 metrics_retention_hours:1,
454 alert_threshold_ms:500,
455 trace_sampling_rate:1.0,
456 max_traces_stored:5000,
457 })
458 }
459
460 pub fn create_metric(
461 metric_type:MetricType,
462 value:f64,
463 channel:Option<String>,
464 tags:HashMap<String, String>,
465 ) -> PerformanceMetric {
466 PerformanceMetric {
467 metric_type,
468 value,
469 timestamp:SystemTime::now()
470 .duration_since(SystemTime::UNIX_EPOCH)
471 .unwrap_or_default()
472 .as_millis() as u64,
473 channel,
474 tags,
475 }
476 }
477
478 pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
479 TraceLog {
480 timestamp:SystemTime::now()
481 .duration_since(SystemTime::UNIX_EPOCH)
482 .unwrap_or_default()
483 .as_millis() as u64,
484 message,
485 level,
486 fields,
487 }
488 }
489
490 pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
491 let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
492 let error_score = 100.0 * (1.0 - error_rate / 100.0);
493 let throughput_score = throughput / 1000.0;
494
495 (time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
496 .max(0.0)
497 .min(100.0)
498 }
499
500 pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
501 match metric_type {
502 MetricType::MessageProcessingTime => format!("{:.2}ms", value),
503 MetricType::ConnectionLatency => format!("{:.2}ms", value),
504 MetricType::MemoryUsage => format!("{:.2}MB", value),
505 MetricType::CpuUsage => format!("{:.2}%", value),
506 MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
507 MetricType::ErrorRate => format!("{:.2}%", value),
508 MetricType::QueueSize => format!("{:.0}", value),
509 }
510 }
511}
512
513impl Clone for Struct {
514 fn clone(&self) -> Self {
515 Self {
516 config:self.config.clone(),
517 metrics:self.metrics.clone(),
518 traces:self.traces.clone(),
519 alerts:self.alerts.clone(),
520 statistics:self.statistics.clone(),
521 is_running:Arc::new(AsyncMutex::new(false)),
522 }
523 }
524}