Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs1#![allow(non_snake_case)]
2
3use std::{
11 collections::HashMap,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use tokio::{
17 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
18 time::{interval, timeout},
19};
20
21use crate::{
22 IPC::Enhanced::ConnectionPool::{
23 ConnectionHandle::Struct as ConnectionHandle,
24 HealthChecker::Struct as HealthChecker,
25 PoolConfig::Struct as PoolConfig,
26 PoolStats::Struct as PoolStats,
27 },
28 dev_log,
29};
30
31pub struct Struct {
32 pub config:PoolConfig,
33 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
34 pub semaphore:Arc<Semaphore>,
35 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
36 pub stats:Arc<RwLock<PoolStats>>,
37 pub health_checker:Arc<AsyncMutex<HealthChecker>>,
38 pub is_running:Arc<AsyncMutex<bool>>,
39}
40
41impl Struct {
42 pub fn new(config:PoolConfig) -> Self {
43 let max_connections = config.max_connections;
44 let min_connections = config.min_connections;
45
46 let pool = Self {
47 config:config.clone(),
48 connections:Arc::new(AsyncMutex::new(HashMap::new())),
49 semaphore:Arc::new(Semaphore::new(max_connections)),
50 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
51 stats:Arc::new(RwLock::new(PoolStats {
52 total_connections:0,
53 active_connections:0,
54 idle_connections:0,
55 healthy_connections:0,
56 max_connections,
57 min_connections,
58 wait_queue_size:0,
59 average_wait_time_ms:0.0,
60 total_operations:0,
61 successful_operations:0,
62 error_rate:0.0,
63 })),
64 health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
65 is_running:Arc::new(AsyncMutex::new(false)),
66 };
67
68 dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
69 pool
70 }
71
72 pub async fn start(&self) -> Result<(), String> {
73 {
74 let mut running = self.is_running.lock().await;
75 if *running {
76 return Ok(());
77 }
78 *running = true;
79 }
80
81 self.start_health_monitoring().await;
82 self.start_connection_cleanup().await;
83 self.initialize_min_connections().await;
84
85 dev_log!("ipc", "[ConnectionPool] Started connection pool");
86 Ok(())
87 }
88
89 pub async fn stop(&self) -> Result<(), String> {
90 {
91 let mut running = self.is_running.lock().await;
92 if !*running {
93 return Ok(());
94 }
95 *running = false;
96 }
97
98 {
99 let mut connections = self.connections.lock().await;
100 connections.clear();
101 }
102
103 {
104 let mut wait_queue = self.wait_queue.lock().await;
105 for notifier in wait_queue.drain(..) {
106 notifier.notify_one();
107 }
108 }
109
110 dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
111 Ok(())
112 }
113
114 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
115 let start_time = Instant::now();
116
117 let _permit = timeout(
118 Duration::from_millis(self.config.connection_timeout_ms),
119 self.semaphore.acquire(),
120 )
121 .await
122 .map_err(|_| "Connection timeout".to_string())?
123 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
124
125 let wait_time = start_time.elapsed().as_millis() as f64;
126
127 {
128 let mut stats = self.stats.write().await;
129 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
130 / (stats.total_operations as f64 + 1.0);
131 }
132
133 let connection = self.find_or_create_connection().await?;
134
135 {
136 let mut stats = self.stats.write().await;
137 stats.active_connections += 1;
138 stats.total_operations += 1;
139 }
140
141 dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
142 Ok(connection)
143 }
144
145 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
146 let connection_id = handle.id.clone();
147 handle.last_used = Instant::now();
148
149 {
150 let mut connections = self.connections.lock().await;
151 connections.insert(handle.id.clone(), handle.clone());
152 }
153
154 {
155 let mut stats = self.stats.write().await;
156 stats.active_connections = stats.active_connections.saturating_sub(1);
157 stats.idle_connections += 1;
158 }
159
160 drop(handle);
161
162 dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
163 }
164
165 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
166 let mut connections = self.connections.lock().await;
167
168 for (_id, handle) in connections.iter_mut() {
169 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
170 handle.last_used = Instant::now();
171 return Ok(handle.clone());
172 }
173 }
174
175 let new_handle = ConnectionHandle::new();
176 connections.insert(new_handle.id.clone(), new_handle.clone());
177
178 {
179 let mut stats = self.stats.write().await;
180 stats.total_connections += 1;
181 stats.healthy_connections += 1;
182 }
183
184 Ok(new_handle)
185 }
186
187 async fn start_health_monitoring(&self) {
188 let pool = Arc::new(self.clone());
189
190 tokio::spawn(async move {
191 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
192
193 while *pool.is_running.lock().await {
194 interval.tick().await;
195
196 if let Err(e) = pool.check_connection_health().await {
197 dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
198 }
199 }
200 });
201 }
202
203 async fn start_connection_cleanup(&self) {
204 let pool = Arc::new(self.clone());
205
206 tokio::spawn(async move {
207 let mut interval = interval(Duration::from_secs(60));
208
209 while *pool.is_running.lock().await {
210 interval.tick().await;
211
212 let cleaned_count = pool.cleanup_stale_connections().await;
213 if cleaned_count > 0 {
214 dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
215 }
216 }
217 });
218 }
219
220 async fn initialize_min_connections(&self) {
221 let current_count = self.connections.lock().await.len();
222
223 if current_count < self.config.min_connections {
224 let needed = self.config.min_connections - current_count;
225
226 for _ in 0..needed {
227 let handle = ConnectionHandle::new();
228 let mut connections = self.connections.lock().await;
229 connections.insert(handle.id.clone(), handle);
230 }
231
232 dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
233 }
234 }
235
236 async fn check_connection_health(&self) -> Result<(), String> {
237 let mut connections = self.connections.lock().await;
238 let mut _health_checker = self.health_checker.lock().await;
239
240 let mut healthy_count = 0;
241
242 for (_id, handle) in connections.iter_mut() {
243 let is_healthy = _health_checker.check_connection_health(handle).await;
244 handle.update_health(is_healthy);
245
246 if handle.is_healthy() {
247 healthy_count += 1;
248 }
249 }
250
251 {
252 let mut stats = self.stats.write().await;
253 stats.healthy_connections = healthy_count;
254 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
255
256 if stats.total_operations > 0 {
257 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
258 }
259 }
260
261 Ok(())
262 }
263
264 pub async fn cleanup_stale_connections(&self) -> usize {
265 let mut connections = self.connections.lock().await;
266
267 let stale_ids:Vec<String> = connections
268 .iter()
269 .filter(|(_, handle)| {
270 handle.age().as_millis() > self.config.max_lifetime_ms as u128
271 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
272 || !handle.is_healthy()
273 })
274 .map(|(id, _)| id.clone())
275 .collect();
276
277 for id in &stale_ids {
278 connections.remove(id);
279 }
280
281 {
282 let mut stats = self.stats.write().await;
283 stats.total_connections = connections.len();
284 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
285 }
286
287 stale_ids.len()
288 }
289
290 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
291
292 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
293
294 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
295
296 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
297
298 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
299
300 pub fn high_performance_pool() -> Self {
301 Self::new(PoolConfig {
302 max_connections:50,
303 min_connections:10,
304 connection_timeout_ms:10000,
305 max_lifetime_ms:180000,
306 idle_timeout_ms:30000,
307 health_check_interval_ms:15000,
308 })
309 }
310
311 pub fn conservative_pool() -> Self {
312 Self::new(PoolConfig {
313 max_connections:5,
314 min_connections:1,
315 connection_timeout_ms:60000,
316 max_lifetime_ms:600000,
317 idle_timeout_ms:120000,
318 health_check_interval_ms:60000,
319 })
320 }
321
322 pub fn calculate_optimal_pool_size() -> usize {
323 let num_cpus = num_cpus::get();
324 (num_cpus * 2).max(4).min(50)
325 }
326}
327
328impl Clone for Struct {
329 fn clone(&self) -> Self {
330 Self {
331 config:self.config.clone(),
332 connections:self.connections.clone(),
333 semaphore:self.semaphore.clone(),
334 wait_queue:self.wait_queue.clone(),
335 stats:self.stats.clone(),
336 health_checker:self.health_checker.clone(),
337 is_running:self.is_running.clone(),
338 }
339 }
340}