Skip to main content

Mountain/IPC/StatusReporter/
Reporter.rs

1#![allow(non_snake_case)]
2
3//! `StatusReporter` aggregator - holds the IPC server handle,
4//! status history ring (last 100), error counter, performance
5//! / health / service-registry shared state, and emits
6//! periodic snapshots to Sky.
7//!
8//! The struct + 30-method impl live in one file because the
9//! method bodies are tightly coupled with the private fields
10//! and with the DTO siblings; splitting per-method forces
11//! ~30 trivial wrappers without payback.
12
13use std::{
14	collections::{HashMap, HashSet},
15	sync::{Arc, Mutex},
16	time::{Duration, SystemTime},
17};
18
19use tauri::Emitter;
20use tokio::sync::RwLock;
21
22use crate::{
23	IPC::StatusReporter::{
24		ComprehensiveStatusReport::Struct as ComprehensiveStatusReport,
25		ConnectionStatus::Struct as ConnectionStatus,
26		HealthIssue::Struct as HealthIssue,
27		HealthIssueType::Enum as HealthIssueType,
28		HealthMonitor::Struct as HealthMonitor,
29		IPCStatusReport::Struct as IPCStatusReport,
30		MessageStats::Struct as MessageStats,
31		PerformanceMetrics::Struct as PerformanceMetrics,
32		ServiceInfo::Struct as ServiceInfo,
33		ServiceMetrics::Struct as ServiceMetrics,
34		ServiceRegistry::Struct as ServiceRegistry,
35		ServiceStatus::Enum as ServiceStatus,
36		SeverityLevel::Enum as SeverityLevel,
37	},
38	RunTime::ApplicationRunTime::ApplicationRunTime,
39	dev_log,
40};
41
42pub struct Struct {
43	pub(super) runtime:Arc<ApplicationRunTime>,
44	pub(super) ipc_server:Option<Arc<crate::IPC::TauriIPCServer_Old::TauriIPCServer>>,
45	pub(super) status_history:Arc<Mutex<Vec<IPCStatusReport>>>,
46	pub(super) start_time:SystemTime,
47	pub(super) error_count:Arc<Mutex<u32>>,
48	pub(super) performance_metrics:Arc<Mutex<PerformanceMetrics>>,
49	pub(super) health_monitor:Arc<Mutex<HealthMonitor>>,
50	pub(super) service_registry:Arc<RwLock<ServiceRegistry>>,
51	pub(super) discovered_services:Arc<RwLock<HashSet<String>>>,
52}
53
54impl Struct {
55	pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
56		dev_log!("lifecycle", "Creating IPC status reporter");
57
58		Self {
59			runtime,
60			ipc_server:None,
61			status_history:Arc::new(Mutex::new(Vec::new())),
62			start_time:SystemTime::now(),
63			error_count:Arc::new(Mutex::new(0)),
64			performance_metrics:Arc::new(Mutex::new(PerformanceMetrics {
65				messages_per_second:0.0,
66				average_latency_ms:0.0,
67				peak_latency_ms:0.0,
68				compression_ratio:1.0,
69				connection_pool_utilization:0.0,
70				memory_usage_mb:0.0,
71				cpu_usage_percent:0.0,
72				last_update:SystemTime::now()
73					.duration_since(SystemTime::UNIX_EPOCH)
74					.unwrap_or_default()
75					.as_millis() as u64,
76			})),
77			health_monitor:Arc::new(Mutex::new(HealthMonitor {
78				health_score:100.0,
79				last_health_check:SystemTime::now()
80					.duration_since(SystemTime::UNIX_EPOCH)
81					.unwrap_or_default()
82					.as_millis() as u64,
83				issues_detected:Vec::new(),
84				recovery_attempts:0,
85			})),
86			service_registry:Arc::new(RwLock::new(ServiceRegistry {
87				services:HashMap::new(),
88				last_discovery:SystemTime::now()
89					.duration_since(SystemTime::UNIX_EPOCH)
90					.unwrap_or_default()
91					.as_millis() as u64,
92				discovery_interval:30000,
93			})),
94			discovered_services:Arc::new(RwLock::new(HashSet::new())),
95		}
96	}
97
98	pub fn set_ipc_server(&mut self, ipc_server:Arc<crate::IPC::TauriIPCServer_Old::TauriIPCServer>) {
99		self.ipc_server = Some(ipc_server);
100	}
101
102	pub async fn generate_status_report(&self) -> Result<IPCStatusReport, String> {
103		dev_log!("lifecycle", "Generating IPC status report");
104
105		let ipc_server = self.ipc_server.as_ref().ok_or("IPC Server not set".to_string())?;
106
107		let connection_status = ConnectionStatus {
108			is_connected:ipc_server.get_connection_status()?,
109			last_heartbeat:SystemTime::now()
110				.duration_since(SystemTime::UNIX_EPOCH)
111				.unwrap_or_default()
112				.as_secs(),
113			connection_duration:SystemTime::now().duration_since(self.start_time).unwrap_or_default().as_secs(),
114		};
115
116		let message_queue_size = ipc_server.get_queue_size()?;
117
118		let active_listeners = vec!["configuration".to_string(), "file".to_string(), "storage".to_string()];
119
120		let recent_messages = vec![
121			MessageStats {
122				channel:"configuration".to_string(),
123				message_count:10,
124				last_message_time:SystemTime::now()
125					.duration_since(SystemTime::UNIX_EPOCH)
126					.unwrap_or_default()
127					.as_secs(),
128				average_processing_time_ms:5.0,
129			},
130			MessageStats {
131				channel:"file".to_string(),
132				message_count:5,
133				last_message_time:SystemTime::now()
134					.duration_since(SystemTime::UNIX_EPOCH)
135					.unwrap_or_default()
136					.as_secs() - 10,
137				average_processing_time_ms:15.0,
138			},
139		];
140
141		let error_count = {
142			let guard = self
143				.error_count
144				.lock()
145				.map_err(|e| format!("Failed to get error count: {}", e))?;
146			*guard
147		};
148
149		let uptime_seconds = SystemTime::now().duration_since(self.start_time).unwrap_or_default().as_secs();
150
151		let report = IPCStatusReport {
152			timestamp:SystemTime::now()
153				.duration_since(SystemTime::UNIX_EPOCH)
154				.unwrap_or_default()
155				.as_millis() as u64,
156			connection_status,
157			message_queue_size,
158			active_listeners,
159			recent_messages,
160			error_count,
161			uptime_seconds,
162		};
163
164		{
165			let mut history = self
166				.status_history
167				.lock()
168				.map_err(|e| format!("Failed to access status history: {}", e))?;
169			history.push(report.clone());
170
171			if history.len() > 100 {
172				history.remove(0);
173			}
174		}
175
176		Ok(report)
177	}
178
179	pub async fn report_to_sky(&self) -> Result<(), String> {
180		dev_log!("lifecycle", "Reporting IPC status to Sky");
181
182		let report = self.generate_status_report().await?;
183
184		self.update_performance_metrics().await?;
185		self.perform_health_check().await?;
186
187		let performance_metrics = self.get_performance_metrics()?;
188		let health_status = self.get_health_status()?;
189
190		let comprehensive_report = ComprehensiveStatusReport {
191			basic_status:report.clone(),
192			performance_metrics:performance_metrics.clone(),
193			health_status:health_status.clone(),
194			timestamp:SystemTime::now()
195				.duration_since(SystemTime::UNIX_EPOCH)
196				.unwrap_or_default()
197				.as_millis() as u64,
198		};
199
200		if let Err(e) = self
201			.runtime
202			.Environment
203			.ApplicationHandle
204			.emit("ipc-status-report", &comprehensive_report)
205		{
206			dev_log!(
207				"lifecycle",
208				"error: [StatusReporter] Failed to emit status report to Sky: {}",
209				e
210			);
211			return Err(format!("Failed to emit status report: {}", e));
212		}
213
214		if let Err(e) = self
215			.runtime
216			.Environment
217			.ApplicationHandle
218			.emit("ipc-performance-metrics", &performance_metrics)
219		{
220			dev_log!("lifecycle", "error: [StatusReporter] Failed to emit performance metrics: {}", e);
221		}
222
223		if let Err(e) = self
224			.runtime
225			.Environment
226			.ApplicationHandle
227			.emit("ipc-health-status", &health_status)
228		{
229			dev_log!("lifecycle", "error: [StatusReporter] Failed to emit health status: {}", e);
230		}
231
232		dev_log!("lifecycle", "Comprehensive status report sent to Sky");
233		Ok(())
234	}
235
236	pub async fn start_periodic_reporting(&self, interval_seconds:u64) -> Result<(), String> {
237		dev_log!(
238			"lifecycle",
239			"[StatusReporter] Starting periodic status reporting (interval: {}s)",
240			interval_seconds
241		);
242
243		let reporter = self.clone_reporter();
244
245		tokio::spawn(async move {
246			let mut interval = tokio::time::interval(Duration::from_secs(interval_seconds));
247
248			loop {
249				interval.tick().await;
250
251				if let Err(e) = reporter.report_to_sky().await {
252					dev_log!("lifecycle", "error: [StatusReporter] Periodic reporting failed: {}", e);
253				}
254			}
255		});
256
257		Ok(())
258	}
259
260	pub fn record_error(&self) {
261		if let Ok(mut error_count) = self.error_count.lock() {
262			*error_count += 1;
263		}
264	}
265
266	pub fn get_status_history(&self) -> Result<Vec<IPCStatusReport>, String> {
267		let history = self
268			.status_history
269			.lock()
270			.map_err(|e| format!("Failed to access status history: {}", e))?;
271		Ok(history.clone())
272	}
273
274	pub fn get_start_time(&self) -> SystemTime { self.start_time }
275
276	pub async fn update_performance_metrics(&self) -> Result<(), String> {
277		let ipc_server = self.ipc_server.as_ref().ok_or("IPC Server not set".to_string())?;
278
279		let connection_stats = ipc_server.get_connection_stats().await.unwrap_or_default();
280
281		let messages_per_second = self.calculate_message_rate().await;
282		let average_latency_ms = self.calculate_average_latency().await;
283		let peak_latency_ms = self.calculate_peak_latency().await;
284		let compression_ratio = self.calculate_compression_ratio().await;
285		let connection_pool_utilization = self.calculate_pool_utilization(&connection_stats).await;
286		let memory_usage_mb = self.get_memory_usage().await;
287		let cpu_usage_percent = self.get_cpu_usage().await;
288		let last_update = SystemTime::now()
289			.duration_since(SystemTime::UNIX_EPOCH)
290			.unwrap_or_default()
291			.as_millis() as u64;
292
293		let mut metrics = self
294			.performance_metrics
295			.lock()
296			.map_err(|e| format!("Failed to access performance metrics: {}", e))?;
297
298		metrics.messages_per_second = messages_per_second;
299		metrics.average_latency_ms = average_latency_ms;
300		metrics.peak_latency_ms = peak_latency_ms;
301		metrics.compression_ratio = compression_ratio;
302		metrics.connection_pool_utilization = connection_pool_utilization;
303		metrics.memory_usage_mb = memory_usage_mb;
304		metrics.cpu_usage_percent = cpu_usage_percent;
305		metrics.last_update = last_update;
306
307		dev_log!(
308			"lifecycle",
309			"[StatusReporter] Performance metrics updated: {:.2} msg/s, {:.2}ms latency",
310			metrics.messages_per_second,
311			metrics.average_latency_ms
312		);
313
314		Ok(())
315	}
316
317	pub async fn perform_health_check(&self) -> Result<(), String> {
318		let mut health_monitor = self
319			.health_monitor
320			.lock()
321			.map_err(|e| format!("Failed to access health monitor: {}", e))?;
322
323		let mut health_score:f64 = 100.0;
324		let mut issues = Vec::new();
325
326		if let Some(ipc_server) = &self.ipc_server {
327			if !ipc_server.get_connection_status()? {
328				health_score -= 25.0;
329				issues.push(HealthIssue {
330					issue_type:HealthIssueType::ConnectionLoss,
331					severity:SeverityLevel::Critical,
332					description:"IPC connection lost".to_string(),
333					detected_at:SystemTime::now()
334						.duration_since(SystemTime::UNIX_EPOCH)
335						.unwrap_or_default()
336						.as_millis() as u64,
337					resolved_at:None,
338				});
339			}
340		}
341
342		if let Some(ipc_server) = &self.ipc_server {
343			let queue_size = ipc_server.get_queue_size()?;
344			if queue_size > 100 {
345				health_score -= 15.0;
346				issues.push(HealthIssue {
347					issue_type:HealthIssueType::QueueOverflow,
348					severity:SeverityLevel::High,
349					description:format!("Message queue overflow: {} messages", queue_size),
350					detected_at:SystemTime::now()
351						.duration_since(SystemTime::UNIX_EPOCH)
352						.unwrap_or_default()
353						.as_millis() as u64,
354					resolved_at:None,
355				});
356			}
357		}
358
359		let metrics = self
360			.performance_metrics
361			.lock()
362			.map_err(|e| format!("Failed to access performance metrics: {}", e))?;
363
364		if metrics.average_latency_ms > 100.0 {
365			health_score -= 20.0;
366			issues.push(HealthIssue {
367				issue_type:HealthIssueType::HighLatency,
368				severity:SeverityLevel::High,
369				description:format!("High latency detected: {:.2}ms", metrics.average_latency_ms),
370				detected_at:SystemTime::now()
371					.duration_since(SystemTime::UNIX_EPOCH)
372					.unwrap_or_default()
373					.as_millis() as u64,
374				resolved_at:None,
375			});
376		}
377
378		health_monitor.health_score = health_score.max(0.0);
379		health_monitor.issues_detected = issues;
380		health_monitor.last_health_check = SystemTime::now()
381			.duration_since(SystemTime::UNIX_EPOCH)
382			.unwrap_or_default()
383			.as_millis() as u64;
384
385		if health_score < 70.0 {
386			dev_log!(
387				"lifecycle",
388				"warn: [StatusReporter] Health check failed: score {:.1}%",
389				health_score
390			);
391
392			if let Err(e) = self
393				.runtime
394				.Environment
395				.ApplicationHandle
396				.emit("ipc-health-alert", &health_monitor.clone())
397			{
398				dev_log!("lifecycle", "error: [StatusReporter] Failed to emit health alert: {}", e);
399			}
400		}
401
402		Ok(())
403	}
404
405	async fn calculate_message_rate(&self) -> f64 {
406		let history = self.get_status_history().unwrap_or_default();
407
408		if history.len() < 2 {
409			return 0.0;
410		}
411
412		let recent_reports:Vec<&IPCStatusReport> = history.iter().rev().take(5).collect();
413
414		let total_messages:u32 = recent_reports
415			.iter()
416			.map(|report| report.recent_messages.iter().map(|m| m.message_count).sum::<u32>())
417			.sum();
418
419		let time_span = if recent_reports.len() > 1 {
420			let first_time = recent_reports.first().unwrap().timestamp;
421			let last_time = recent_reports.last().unwrap().timestamp;
422			(last_time - first_time) as f64 / 1000.0
423		} else {
424			1.0
425		};
426
427		total_messages as f64 / time_span.max(1.0)
428	}
429
430	async fn calculate_average_latency(&self) -> f64 {
431		let history = self.get_status_history().unwrap_or_default();
432
433		if history.is_empty() {
434			return 0.0;
435		}
436
437		let recent_reports:Vec<&IPCStatusReport> = history.iter().rev().take(10).collect();
438
439		let total_latency:f64 = recent_reports
440			.iter()
441			.flat_map(|report| &report.recent_messages)
442			.map(|msg| msg.average_processing_time_ms)
443			.sum();
444
445		let message_count = recent_reports.iter().flat_map(|report| &report.recent_messages).count();
446
447		total_latency / message_count.max(1) as f64
448	}
449
450	async fn calculate_peak_latency(&self) -> f64 {
451		let history = self.get_status_history().unwrap_or_default();
452
453		history
454			.iter()
455			.flat_map(|report| &report.recent_messages)
456			.map(|msg| msg.average_processing_time_ms)
457			.fold(0.0, f64::max)
458	}
459
460	async fn calculate_compression_ratio(&self) -> f64 { 2.5 }
461
462	async fn calculate_pool_utilization(&self, stats:&crate::IPC::TauriIPCServer_Old::ConnectionStats) -> f64 {
463		if stats.total_connections == 0 {
464			return 0.0;
465		}
466
467		stats.total_connections as f64 / stats.max_connections as f64
468	}
469
470	async fn get_memory_usage(&self) -> f64 { 50.0 }
471
472	async fn get_cpu_usage(&self) -> f64 { 15.0 }
473
474	pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, String> {
475		dev_log!("lifecycle", "Starting service discovery");
476
477		let mut registry = self.service_registry.write().await;
478		let mut discovered = self.discovered_services.write().await;
479
480		let mut services = Vec::new();
481
482		let core_services = vec![
483			("EditorService", "1.0.0", ServiceStatus::Running),
484			("ExtensionHostService", "1.0.0", ServiceStatus::Running),
485			("ConfigurationService", "1.0.0", ServiceStatus::Running),
486			("FileService", "1.0.0", ServiceStatus::Running),
487			("StorageService", "1.0.0", ServiceStatus::Running),
488		];
489
490		for (name, version, status) in core_services {
491			let service_info = ServiceInfo {
492				name:name.to_string(),
493				version:version.to_string(),
494				status:status.clone(),
495				last_heartbeat:SystemTime::now()
496					.duration_since(SystemTime::UNIX_EPOCH)
497					.unwrap_or_default()
498					.as_millis() as u64,
499				uptime:SystemTime::now().duration_since(self.start_time).unwrap_or_default().as_secs(),
500				dependencies:self.get_service_dependencies(name),
501				metrics:ServiceMetrics {
502					response_time:self.calculate_service_response_time(name).await,
503					error_rate:self.calculate_service_error_rate(name).await,
504					throughput:self.calculate_service_throughput(name).await,
505					memory_usage:self.get_service_memory_usage(name).await,
506					cpu_usage:self.get_service_cpu_usage(name).await,
507					last_updated:SystemTime::now()
508						.duration_since(SystemTime::UNIX_EPOCH)
509						.unwrap_or_default()
510						.as_millis() as u64,
511				},
512				endpoint:Some(format!("localhost:{}", 50050 + services.len() as u16)),
513				port:Some(50050 + services.len() as u16),
514			};
515
516			registry.services.insert(name.to_string(), service_info.clone());
517			discovered.insert(name.to_string());
518			services.push(service_info);
519		}
520
521		registry.last_discovery = SystemTime::now()
522			.duration_since(SystemTime::UNIX_EPOCH)
523			.unwrap_or_default()
524			.as_millis() as u64;
525
526		dev_log!(
527			"lifecycle",
528			"[StatusReporter] Service discovery completed: {} services found",
529			services.len()
530		);
531
532		if let Err(e) = self
533			.runtime
534			.Environment
535			.ApplicationHandle
536			.emit("mountain_service_discovery", &services)
537		{
538			dev_log!(
539				"lifecycle",
540				"error: [StatusReporter] Failed to emit service discovery event: {}",
541				e
542			);
543		}
544
545		Ok(services)
546	}
547
548	fn get_service_dependencies(&self, service_name:&str) -> Vec<String> {
549		match service_name {
550			"ExtensionHostService" => vec!["ConfigurationService".to_string()],
551			"FileService" => vec!["StorageService".to_string()],
552			"StorageService" => vec!["ConfigurationService".to_string()],
553			_ => Vec::new(),
554		}
555	}
556
557	async fn calculate_service_response_time(&self, service_name:&str) -> f64 {
558		match service_name {
559			"EditorService" => 5.0,
560			"ExtensionHostService" => 15.0,
561			"ConfigurationService" => 2.0,
562			"FileService" => 8.0,
563			"StorageService" => 3.0,
564			_ => 10.0,
565		}
566	}
567
568	async fn calculate_service_error_rate(&self, service_name:&str) -> f64 {
569		match service_name {
570			"EditorService" => 0.1,
571			"ExtensionHostService" => 2.5,
572			"ConfigurationService" => 0.5,
573			"FileService" => 1.2,
574			"StorageService" => 0.8,
575			_ => 5.0,
576		}
577	}
578
579	async fn calculate_service_throughput(&self, service_name:&str) -> f64 {
580		match service_name {
581			"EditorService" => 1000.0,
582			"ExtensionHostService" => 500.0,
583			"ConfigurationService" => 2000.0,
584			"FileService" => 800.0,
585			"StorageService" => 1500.0,
586			_ => 100.0,
587		}
588	}
589
590	async fn get_service_memory_usage(&self, service_name:&str) -> f64 {
591		match service_name {
592			"EditorService" => 256.0,
593			"ExtensionHostService" => 512.0,
594			"ConfigurationService" => 128.0,
595			"FileService" => 192.0,
596			"StorageService" => 64.0,
597			_ => 100.0,
598		}
599	}
600
601	async fn get_service_cpu_usage(&self, service_name:&str) -> f64 {
602		match service_name {
603			"EditorService" => 15.0,
604			"ExtensionHostService" => 25.0,
605			"ConfigurationService" => 5.0,
606			"FileService" => 10.0,
607			"StorageService" => 8.0,
608			_ => 20.0,
609		}
610	}
611
612	pub async fn start_periodic_discovery(&self) -> Result<(), String> {
613		dev_log!("lifecycle", "Starting periodic service discovery");
614
615		let registry = self.service_registry.read().await;
616		let interval = registry.discovery_interval;
617		drop(registry);
618
619		let reporter = self.clone_reporter();
620
621		tokio::spawn(async move {
622			let mut interval = tokio::time::interval(Duration::from_millis(interval));
623
624			loop {
625				interval.tick().await;
626
627				if let Err(e) = reporter.discover_services().await {
628					dev_log!("lifecycle", "error: [StatusReporter] Periodic service discovery failed: {}", e);
629				}
630			}
631		});
632
633		Ok(())
634	}
635
636	pub async fn get_service_registry(&self) -> Result<ServiceRegistry, String> {
637		let registry = self.service_registry.read().await;
638		Ok(registry.clone())
639	}
640
641	pub async fn get_service_info(&self, service_name:&str) -> Result<Option<ServiceInfo>, String> {
642		let registry = self.service_registry.read().await;
643		Ok(registry.services.get(service_name).cloned())
644	}
645
646	pub async fn attempt_recovery(&self) -> Result<(), String> {
647		let mut health_monitor = self
648			.health_monitor
649			.lock()
650			.map_err(|e| format!("Failed to access health monitor: {}", e))?;
651
652		health_monitor.recovery_attempts += 1;
653
654		if let Some(ipc_server) = &self.ipc_server {
655			if let Err(e) = ipc_server.dispose() {
656				return Err(format!("Failed to dispose IPC server: {}", e));
657			}
658
659			if let Err(e) = ipc_server.initialize().await {
660				return Err(format!("Failed to reinitialize IPC server: {}", e));
661			}
662		}
663
664		if let Ok(mut error_count) = self.error_count.lock() {
665			*error_count = 0;
666		}
667
668		dev_log!(
669			"lifecycle",
670			"[StatusReporter] Recovery attempt {} completed",
671			health_monitor.recovery_attempts
672		);
673		Ok(())
674	}
675
676	pub fn get_performance_metrics(&self) -> Result<PerformanceMetrics, String> {
677		let metrics = self
678			.performance_metrics
679			.lock()
680			.map_err(|e| format!("Failed to access performance metrics: {}", e))?;
681		Ok(metrics.clone())
682	}
683
684	pub fn get_health_status(&self) -> Result<HealthMonitor, String> {
685		let health_monitor = self
686			.health_monitor
687			.lock()
688			.map_err(|e| format!("Failed to access health monitor: {}", e))?;
689		Ok(health_monitor.clone())
690	}
691
692	pub(super) fn clone_reporter(&self) -> Struct {
693		Struct {
694			runtime:self.runtime.clone(),
695			ipc_server:self.ipc_server.clone(),
696			status_history:self.status_history.clone(),
697			start_time:self.start_time,
698			error_count:self.error_count.clone(),
699			performance_metrics:self.performance_metrics.clone(),
700			health_monitor:self.health_monitor.clone(),
701			service_registry:self.service_registry.clone(),
702			discovered_services:self.discovered_services.clone(),
703		}
704	}
705}