Mountain/IPC/Connection/
Manager.rs1use std::{collections::HashMap, sync::Arc};
2
3use log::debug;
4use tokio::{
5 sync::{Mutex as AsyncMutex, Semaphore},
6 time::{Duration, timeout},
7};
8
9use super::{
10 Health::HealthChecker,
11 Types::{ConnectionHandle, ConnectionStats},
12};
13
14pub type ConnectionManager = ConnectionPool;
19
20pub struct ConnectionPool {
71 MaxConnections:usize,
73
74 ConnectionTimeout:Duration,
76
77 Semaphore:Arc<Semaphore>,
79
80 ActiveConnections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83 HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88 pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100 debug!(
101 "[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
102 MaxConnections, ConnectionTimeout
103 );
104
105 Self {
106 MaxConnections,
107 ConnectionTimeout,
108 Semaphore:Arc::new(Semaphore::new(MaxConnections)),
109 ActiveConnections:Arc::new(AsyncMutex::new(HashMap::new())),
110 HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
111 }
112 }
113
114 pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
118
119 pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
135 debug!("[ConnectionPool] Acquiring connection permit");
136
137 let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
139 .await
140 .map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
141 .map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
142
143 let handle = ConnectionHandle::new();
145
146 {
148 let mut connections = self.ActiveConnections.lock().await;
149 connections.insert(handle.id.clone(), handle.clone());
150 }
151
152 debug!("[ConnectionPool] Connection {} acquired (permit released on drop)", handle.id);
153
154 self.StartHealthMonitoring(&handle.id).await;
156
157 drop(permit);
159
160 Ok(handle)
161 }
162
163 pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
177 debug!("[ConnectionPool] Releasing connection {}", handle.id);
178
179 {
180 let mut connections = self.ActiveConnections.lock().await;
181 connections.remove(&handle.id);
182 }
183
184 debug!("[ConnectionPool] Connection {} released", handle.id);
185 }
186
187 pub async fn GetStats(&self) -> ConnectionStats {
203 let connections = self.ActiveConnections.lock().await;
204 let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
205
206 ConnectionStats {
207 total_connections:connections.len(),
208 healthy_connections,
209 max_connections:self.MaxConnections,
210 available_permits:self.Semaphore.available_permits(),
211 connection_timeout:self.ConnectionTimeout,
212 }
213 }
214
215 pub async fn CleanUpStaleConnections(&self) -> usize {
234 let mut connections = self.ActiveConnections.lock().await;
235 let now = std::time::SystemTime::now();
236 let stale_threshold = Duration::from_secs(300); let stale_ids:Vec<String> = connections
239 .iter()
240 .filter(|(_, handle)| {
241 let is_stale_by_time = match now.duration_since(handle.last_used) {
243 Ok(idle_time) => idle_time > stale_threshold,
244 Err(_) => true, };
246 is_stale_by_time || !handle.is_healthy()
247 })
248 .map(|(id, _)| id.clone())
249 .collect();
250
251 let stale_count = stale_ids.len();
252 for id in stale_ids {
253 debug!("[ConnectionPool] Removing stale connection {}", id);
254 connections.remove(&id);
255 }
256
257 if stale_count > 0 {
258 debug!("[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
259 }
260
261 stale_count
262 }
263
264 async fn StartHealthMonitoring(&self, connection_id:&str) {
272 let health_checker = self.HealthChecker.clone();
273 let active_connections = self.ActiveConnections.clone();
274 let connection_id = connection_id.to_string();
275
276 tokio::spawn(async move {
277 let mut interval = tokio::time::interval(Duration::from_secs(30));
278
279 loop {
280 interval.tick().await;
281
282 let mut checker = health_checker.lock().await;
283 let mut connections = match active_connections.try_lock() {
284 Ok(conns) => conns,
285 Err(_) => continue,
286 };
287
288 if let Some(handle) = connections.get_mut(&connection_id) {
289 let is_healthy = checker.check_connection_health(handle).await;
290 handle.update_health(is_healthy);
291
292 if !handle.is_healthy() {
293 debug!(
294 "[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
295 handle.id, handle.health_score, handle.error_count
296 );
297 }
298 } else {
299 debug!(
301 "[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
302 connection_id
303 );
304 break;
305 }
306 }
307 });
308 }
309
310 pub fn max_connections(&self) -> usize { self.MaxConnections }
312
313 pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
315
316 pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
318
319 pub async fn active_connections(&self) -> usize {
321 let connections = self.ActiveConnections.lock().await;
322 connections.len()
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[tokio::test]
331 async fn test_connection_pool_creation() {
332 let pool = ConnectionPool::new(10, Duration::from_secs(30));
333 assert_eq!(pool.max_connections(), 10);
334 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
335 assert_eq!(pool.available_permits(), 10);
336 assert_eq!(pool.active_connections().await, 0);
337 }
338
339 #[tokio::test]
340 async fn test_default_connection_pool() {
341 let pool = ConnectionPool::default();
342 assert_eq!(pool.max_connections(), 10);
343 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
344 }
345
346 #[tokio::test]
347 async fn test_get_and_release_connection() {
348 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
349
350 let handle = pool.GetConnection().await.unwrap();
352 assert_eq!(pool.active_connections().await, 1);
353 assert_eq!(pool.available_permits(), 4); pool.ReleaseConnection(handle).await;
357 assert_eq!(pool.active_connections().await, 0);
358 assert_eq!(pool.available_permits(), 5); }
360
361 #[tokio::test]
362 async fn test_multiple_connections() {
363 let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
364
365 let mut handles = Vec::new();
367 for _ in 0..3 {
368 handles.push(pool.GetConnection().await.unwrap());
369 }
370
371 assert_eq!(pool.active_connections().await, 3);
372 assert_eq!(pool.available_permits(), 0);
373
374 let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
376 assert!(result.is_err()); pool.ReleaseConnection(handles[0].clone()).await;
380 assert_eq!(pool.available_permits(), 1);
381
382 let handle = pool.GetConnection().await.unwrap();
384 assert_eq!(pool.available_permits(), 0);
385
386 for handle in handles {
388 pool.ReleaseConnection(handle).await;
389 }
390 pool.ReleaseConnection(handle).await;
391 }
392
393 #[tokio::test]
394 async fn test_connection_stats() {
395 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
396
397 let stats = pool.GetStats().await;
398 assert_eq!(stats.total_connections, 0);
399 assert_eq!(stats.healthy_connections, 0);
400 assert_eq!(stats.max_connections, 5);
401 assert_eq!(stats.utilization(), 0.0);
402
403 for _ in 0..3 {
405 let _ = pool.GetConnection().await.unwrap();
406 }
407
408 let stats = pool.GetStats().await;
409 assert_eq!(stats.total_connections, 3);
410 assert!(stats.healthy_connections > 0);
411 assert!(stats.utilization() > 0.0);
412 }
413
414 #[tokio::test]
415 async fn test_cleanup_stale_connections() {
416 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
417
418 let mut handle = pool.GetConnection().await.unwrap();
420
421 unsafe {
423 let ptr = &mut handle as *mut ConnectionHandle;
424 (*ptr).last_used = std::time::SystemTime::now()
426 .checked_sub(Duration::from_secs(360))
427 .unwrap_or((*ptr).last_used);
428 (*ptr).health_score = 25.0; }
430
431 pool.ReleaseConnection(handle).await;
433
434 let cleaned = pool.CleanUpStaleConnections().await;
437 assert!(cleaned >= 0);
438 }
439
440 #[tokio::test]
441 async fn test_pool_utilization() {
442 let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
443
444 assert_eq!(pool.GetStats().await.utilization(), 0.0);
445
446 for _ in 0..5 {
448 let _ = pool.GetConnection().await.unwrap();
449 }
450
451 let stats = pool.GetStats().await;
452 assert_eq!(stats.utilization(), 50.0);
453 }
454}