Skip to main content

Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs

1#![allow(non_snake_case)]
2
3//! `Pool::Struct` - bounded connection pool with health
4//! monitoring, idle/lifetime cleanup, wait-queue timeouts, and
5//! statistics. Acquire via `get_connection` (drops a permit on
6//! the inner `Semaphore`); return via `release_connection`.
7//! The struct + 18-method impl + Clone + tests stay in one
8//! file - tightly coupled cluster.
9
10use std::{
11	collections::HashMap,
12	sync::Arc,
13	time::{Duration, Instant},
14};
15
16use tokio::{
17	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
18	time::{interval, timeout},
19};
20
21use crate::{
22	IPC::Enhanced::ConnectionPool::{
23		ConnectionHandle::Struct as ConnectionHandle,
24		HealthChecker::Struct as HealthChecker,
25		PoolConfig::Struct as PoolConfig,
26		PoolStats::Struct as PoolStats,
27	},
28	dev_log,
29};
30
31pub struct Struct {
32	pub config:PoolConfig,
33	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
34	pub semaphore:Arc<Semaphore>,
35	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
36	pub stats:Arc<RwLock<PoolStats>>,
37	pub health_checker:Arc<AsyncMutex<HealthChecker>>,
38	pub is_running:Arc<AsyncMutex<bool>>,
39}
40
41impl Struct {
42	pub fn new(config:PoolConfig) -> Self {
43		let max_connections = config.max_connections;
44		let min_connections = config.min_connections;
45
46		let pool = Self {
47			config:config.clone(),
48			connections:Arc::new(AsyncMutex::new(HashMap::new())),
49			semaphore:Arc::new(Semaphore::new(max_connections)),
50			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
51			stats:Arc::new(RwLock::new(PoolStats {
52				total_connections:0,
53				active_connections:0,
54				idle_connections:0,
55				healthy_connections:0,
56				max_connections,
57				min_connections,
58				wait_queue_size:0,
59				average_wait_time_ms:0.0,
60				total_operations:0,
61				successful_operations:0,
62				error_rate:0.0,
63			})),
64			health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
65			is_running:Arc::new(AsyncMutex::new(false)),
66		};
67
68		dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
69		pool
70	}
71
72	pub async fn start(&self) -> Result<(), String> {
73		{
74			let mut running = self.is_running.lock().await;
75			if *running {
76				return Ok(());
77			}
78			*running = true;
79		}
80
81		self.start_health_monitoring().await;
82		self.start_connection_cleanup().await;
83		self.initialize_min_connections().await;
84
85		dev_log!("ipc", "[ConnectionPool] Started connection pool");
86		Ok(())
87	}
88
89	pub async fn stop(&self) -> Result<(), String> {
90		{
91			let mut running = self.is_running.lock().await;
92			if !*running {
93				return Ok(());
94			}
95			*running = false;
96		}
97
98		{
99			let mut connections = self.connections.lock().await;
100			connections.clear();
101		}
102
103		{
104			let mut wait_queue = self.wait_queue.lock().await;
105			for notifier in wait_queue.drain(..) {
106				notifier.notify_one();
107			}
108		}
109
110		dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
111		Ok(())
112	}
113
114	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
115		let start_time = Instant::now();
116
117		let _permit = timeout(
118			Duration::from_millis(self.config.connection_timeout_ms),
119			self.semaphore.acquire(),
120		)
121		.await
122		.map_err(|_| "Connection timeout".to_string())?
123		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
124
125		let wait_time = start_time.elapsed().as_millis() as f64;
126
127		{
128			let mut stats = self.stats.write().await;
129			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
130				/ (stats.total_operations as f64 + 1.0);
131		}
132
133		let connection = self.find_or_create_connection().await?;
134
135		{
136			let mut stats = self.stats.write().await;
137			stats.active_connections += 1;
138			stats.total_operations += 1;
139		}
140
141		dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
142		Ok(connection)
143	}
144
145	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
146		let connection_id = handle.id.clone();
147		handle.last_used = Instant::now();
148
149		{
150			let mut connections = self.connections.lock().await;
151			connections.insert(handle.id.clone(), handle.clone());
152		}
153
154		{
155			let mut stats = self.stats.write().await;
156			stats.active_connections = stats.active_connections.saturating_sub(1);
157			stats.idle_connections += 1;
158		}
159
160		drop(handle);
161
162		dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
163	}
164
165	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
166		let mut connections = self.connections.lock().await;
167
168		for (_id, handle) in connections.iter_mut() {
169			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
170				handle.last_used = Instant::now();
171				return Ok(handle.clone());
172			}
173		}
174
175		let new_handle = ConnectionHandle::new();
176		connections.insert(new_handle.id.clone(), new_handle.clone());
177
178		{
179			let mut stats = self.stats.write().await;
180			stats.total_connections += 1;
181			stats.healthy_connections += 1;
182		}
183
184		Ok(new_handle)
185	}
186
187	async fn start_health_monitoring(&self) {
188		let pool = Arc::new(self.clone());
189
190		tokio::spawn(async move {
191			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
192
193			while *pool.is_running.lock().await {
194				interval.tick().await;
195
196				if let Err(e) = pool.check_connection_health().await {
197					dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
198				}
199			}
200		});
201	}
202
203	async fn start_connection_cleanup(&self) {
204		let pool = Arc::new(self.clone());
205
206		tokio::spawn(async move {
207			let mut interval = interval(Duration::from_secs(60));
208
209			while *pool.is_running.lock().await {
210				interval.tick().await;
211
212				let cleaned_count = pool.cleanup_stale_connections().await;
213				if cleaned_count > 0 {
214					dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
215				}
216			}
217		});
218	}
219
220	async fn initialize_min_connections(&self) {
221		let current_count = self.connections.lock().await.len();
222
223		if current_count < self.config.min_connections {
224			let needed = self.config.min_connections - current_count;
225
226			for _ in 0..needed {
227				let handle = ConnectionHandle::new();
228				let mut connections = self.connections.lock().await;
229				connections.insert(handle.id.clone(), handle);
230			}
231
232			dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
233		}
234	}
235
236	async fn check_connection_health(&self) -> Result<(), String> {
237		let mut connections = self.connections.lock().await;
238		let mut _health_checker = self.health_checker.lock().await;
239
240		let mut healthy_count = 0;
241
242		for (_id, handle) in connections.iter_mut() {
243			let is_healthy = _health_checker.check_connection_health(handle).await;
244			handle.update_health(is_healthy);
245
246			if handle.is_healthy() {
247				healthy_count += 1;
248			}
249		}
250
251		{
252			let mut stats = self.stats.write().await;
253			stats.healthy_connections = healthy_count;
254			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
255
256			if stats.total_operations > 0 {
257				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
258			}
259		}
260
261		Ok(())
262	}
263
264	pub async fn cleanup_stale_connections(&self) -> usize {
265		let mut connections = self.connections.lock().await;
266
267		let stale_ids:Vec<String> = connections
268			.iter()
269			.filter(|(_, handle)| {
270				handle.age().as_millis() > self.config.max_lifetime_ms as u128
271					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
272					|| !handle.is_healthy()
273			})
274			.map(|(id, _)| id.clone())
275			.collect();
276
277		for id in &stale_ids {
278			connections.remove(id);
279		}
280
281		{
282			let mut stats = self.stats.write().await;
283			stats.total_connections = connections.len();
284			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
285		}
286
287		stale_ids.len()
288	}
289
290	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
291
292	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
293
294	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
295
296	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
297
298	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
299
300	pub fn high_performance_pool() -> Self {
301		Self::new(PoolConfig {
302			max_connections:50,
303			min_connections:10,
304			connection_timeout_ms:10000,
305			max_lifetime_ms:180000,
306			idle_timeout_ms:30000,
307			health_check_interval_ms:15000,
308		})
309	}
310
311	pub fn conservative_pool() -> Self {
312		Self::new(PoolConfig {
313			max_connections:5,
314			min_connections:1,
315			connection_timeout_ms:60000,
316			max_lifetime_ms:600000,
317			idle_timeout_ms:120000,
318			health_check_interval_ms:60000,
319		})
320	}
321
322	pub fn calculate_optimal_pool_size() -> usize {
323		let num_cpus = num_cpus::get();
324		(num_cpus * 2).max(4).min(50)
325	}
326}
327
328impl Clone for Struct {
329	fn clone(&self) -> Self {
330		Self {
331			config:self.config.clone(),
332			connections:self.connections.clone(),
333			semaphore:self.semaphore.clone(),
334			wait_queue:self.wait_queue.clone(),
335			stats:self.stats.clone(),
336			health_checker:self.health_checker.clone(),
337			is_running:self.is_running.clone(),
338		}
339	}
340}