Mountain/IPC/Enhanced/
ConnectionPool.rs

1//! # Connection Pooling and Multiplexing
2//!
3//! Advanced connection pooling for concurrent IPC operations with health
4//! monitoring and connection lifecycle management.
5
6use std::{
7	collections::HashMap,
8	sync::Arc,
9	time::{Duration, Instant},
10};
11
12use log::{debug, error, info, trace, warn};
13use serde::{Deserialize, Serialize};
14use tokio::{
15	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
16	time::{interval, timeout},
17};
18use uuid::Uuid;
19
20/// Connection pool configuration
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PoolConfig {
23	pub max_connections:usize,
24	pub min_connections:usize,
25	pub connection_timeout_ms:u64,
26	pub max_lifetime_ms:u64,
27	pub idle_timeout_ms:u64,
28	pub health_check_interval_ms:u64,
29}
30
31impl Default for PoolConfig {
32	fn default() -> Self {
33		Self {
34			max_connections:10,
35			min_connections:2,
36			// Connection timeout: 30 seconds to acquire a connection from the pool.
37			connection_timeout_ms:30000,
38			// Maximum lifetime: 5 minutes before a connection is retired.
39			max_lifetime_ms:300000,
40			// Idle timeout: 1 minute before an unused connection is closed.
41			idle_timeout_ms:60000,
42			// Health check interval: 30 seconds between connection health probes.
43			health_check_interval_ms:30000,
44		}
45	}
46}
47
48/// Connection health status
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ConnectionHealth {
51	Healthy,
52	Unhealthy,
53	Degraded,
54}
55
56/// Connection handle with health monitoring
57#[derive(Debug, Clone)]
58pub struct ConnectionHandle {
59	pub id:String,
60	pub created_at:Instant,
61	pub last_used:Instant,
62	pub health_score:f64,
63	pub error_count:usize,
64	pub successful_operations:usize,
65	pub total_operations:usize,
66	pub is_active:bool,
67	pub reuse_count:u32,
68	pub health:ConnectionHealth,
69}
70
71impl ConnectionHandle {
72	/// Create a new connection handle
73	pub fn new() -> Self {
74		Self {
75			id:Uuid::new_v4().to_string(),
76			created_at:Instant::now(),
77			last_used:Instant::now(),
78			health_score:100.0,
79			error_count:0,
80			successful_operations:0,
81			total_operations:0,
82			is_active:true,
83			reuse_count:0,
84			health:ConnectionHealth::Healthy,
85		}
86	}
87
88	/// Update health based on operation success
89	pub fn update_health(&mut self, success:bool) {
90		self.last_used = Instant::now();
91		self.total_operations += 1;
92
93		if success {
94			self.successful_operations += 1;
95			// Increase health score gradually
96			self.health_score = (self.health_score + 2.0).min(100.0);
97			self.error_count = 0;
98		} else {
99			self.error_count += 1;
100			// Decrease health score more aggressively
101			self.health_score = (self.health_score - 10.0).max(0.0);
102		}
103
104		// Calculate success rate
105		let success_rate = if self.total_operations > 0 {
106			self.successful_operations as f64 / self.total_operations as f64
107		} else {
108			1.0
109		};
110
111		// Adjust health score based on overall success rate
112		self.health_score = (self.health_score * 0.7 + success_rate * 100.0 * 0.3).max(0.0).min(100.0);
113	}
114
115	/// Check if connection is healthy
116	pub fn is_healthy(&self) -> bool {
117		// Connection is considered healthy if: health score > 50%, fewer than 5 errors,
118		// actively flagged as healthy, and less than 5 minutes old (prevents stale
119		// connections).
120		self.health_score > 50.0 && self.error_count < 5 && self.is_active && self.age().as_secs() < 300
121	}
122
123	/// Get connection age
124	pub fn age(&self) -> Duration { self.created_at.elapsed() }
125
126	/// Get idle time
127	pub fn idle_time(&self) -> Duration { self.last_used.elapsed() }
128
129	/// Get success rate
130	pub fn success_rate(&self) -> f64 {
131		if self.total_operations == 0 {
132			1.0
133		} else {
134			self.successful_operations as f64 / self.total_operations as f64
135		}
136	}
137}
138
139/// Connection pool statistics
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PoolStats {
142	pub total_connections:usize,
143	pub active_connections:usize,
144	pub idle_connections:usize,
145	pub healthy_connections:usize,
146	pub max_connections:usize,
147	pub min_connections:usize,
148	pub wait_queue_size:usize,
149	pub average_wait_time_ms:f64,
150	pub total_operations:u64,
151	pub successful_operations:u64,
152	pub error_rate:f64,
153}
154
155/// Connection pool with advanced management
156pub struct ConnectionPool {
157	pub config:PoolConfig,
158	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
159	pub semaphore:Arc<Semaphore>,
160	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
161	pub stats:Arc<RwLock<PoolStats>>,
162	pub health_checker:Arc<AsyncMutex<ConnectionHealthChecker>>,
163	pub is_running:Arc<AsyncMutex<bool>>,
164}
165
166impl ConnectionPool {
167	/// Create a new connection pool
168	pub fn new(config:PoolConfig) -> Self {
169		let max_connections = config.max_connections;
170		let min_connections = config.min_connections;
171
172		let pool = Self {
173			config:config.clone(),
174			connections:Arc::new(AsyncMutex::new(HashMap::new())),
175			semaphore:Arc::new(Semaphore::new(max_connections)),
176			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
177			stats:Arc::new(RwLock::new(PoolStats {
178				total_connections:0,
179				active_connections:0,
180				idle_connections:0,
181				healthy_connections:0,
182				max_connections,
183				min_connections,
184				wait_queue_size:0,
185				average_wait_time_ms:0.0,
186				total_operations:0,
187				successful_operations:0,
188				error_rate:0.0,
189			})),
190			health_checker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
191			is_running:Arc::new(AsyncMutex::new(false)),
192		};
193
194		info!("[ConnectionPool] Created pool with max {} connections", max_connections);
195		pool
196	}
197
198	/// Start the connection pool
199	pub async fn start(&self) -> Result<(), String> {
200		{
201			let mut running = self.is_running.lock().await;
202			if *running {
203				// If the pool is already running, exit early to avoid duplicate startup.
204				return Ok(());
205			}
206			*running = true;
207		}
208
209		// Start health monitoring
210		self.start_health_monitoring().await;
211
212		// Start connection cleanup
213		self.start_connection_cleanup().await;
214
215		// Initialize minimum connections
216		self.initialize_min_connections().await;
217
218		info!("[ConnectionPool] Started connection pool");
219		Ok(())
220	}
221
222	/// Stop the connection pool
223	pub async fn stop(&self) -> Result<(), String> {
224		{
225			let mut running = self.is_running.lock().await;
226			if !*running {
227				// If the pool is already stopped, exit early to avoid redundant operations.
228				return Ok(());
229			}
230			*running = false;
231		}
232
233		// Clear all connections
234		{
235			let mut connections = self.connections.lock().await;
236			connections.clear();
237		}
238
239		// Notify all waiting tasks
240		{
241			let mut wait_queue = self.wait_queue.lock().await;
242			for notifier in wait_queue.drain(..) {
243				notifier.notify_one();
244			}
245		}
246
247		info!("[ConnectionPool] Stopped connection pool");
248		Ok(())
249	}
250
251	/// Get a connection from the pool
252	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
253		let start_time = Instant::now();
254
255		// Try to acquire permit with timeout
256		let _permit = timeout(
257			Duration::from_millis(self.config.connection_timeout_ms),
258			self.semaphore.acquire(),
259		)
260		.await
261		.map_err(|_| "Connection timeout".to_string())?
262		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
263
264		let wait_time = start_time.elapsed().as_millis() as f64;
265
266		// Update wait time statistics
267		{
268			let mut stats = self.stats.write().await;
269			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
270				/ (stats.total_operations as f64 + 1.0);
271		}
272
273		// Find or create a healthy connection
274		let connection = self.find_or_create_connection().await?;
275
276		// Update statistics
277		{
278			let mut stats = self.stats.write().await;
279			stats.active_connections += 1;
280			stats.total_operations += 1;
281		}
282
283		trace!("[ConnectionPool] Connection acquired: {}", connection.id);
284		Ok(connection)
285	}
286
287	/// Release a connection back to the pool
288	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
289		let connection_id = handle.id.clone();
290
291		handle.last_used = Instant::now();
292
293		// Update connection in pool
294		{
295			let mut connections = self.connections.lock().await;
296			connections.insert(handle.id.clone(), handle.clone());
297		}
298
299		// Update statistics
300		{
301			let mut stats = self.stats.write().await;
302			stats.active_connections = stats.active_connections.saturating_sub(1);
303			stats.idle_connections += 1;
304		}
305
306		// Release the semaphore permit when the handle is dropped.
307		drop(handle);
308
309		trace!("[ConnectionPool] Connection released: {}", connection_id);
310	}
311
312	/// Find or create a healthy connection
313	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
314		let mut connections = self.connections.lock().await;
315
316		// Try to find a healthy connection
317		for (_id, handle) in connections.iter_mut() {
318			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
319				handle.last_used = Instant::now();
320				return Ok(handle.clone());
321			}
322		}
323
324		// No healthy connection found, create new one
325		let new_handle = ConnectionHandle::new();
326		connections.insert(new_handle.id.clone(), new_handle.clone());
327
328		// Update statistics
329		{
330			let mut stats = self.stats.write().await;
331			stats.total_connections += 1;
332			stats.healthy_connections += 1;
333		}
334
335		Ok(new_handle)
336	}
337
338	/// Start health monitoring
339	async fn start_health_monitoring(&self) {
340		let pool = Arc::new(self.clone());
341
342		tokio::spawn(async move {
343			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
344
345			while *pool.is_running.lock().await {
346				interval.tick().await;
347
348				if let Err(e) = pool.check_connection_health().await {
349					error!("[ConnectionPool] Health check failed: {}", e);
350				}
351			}
352		});
353	}
354
355	/// Start connection cleanup
356	async fn start_connection_cleanup(&self) {
357		let pool = Arc::new(self.clone());
358
359		tokio::spawn(async move {
360			// Check for stale connections every minute.
361			let mut interval = interval(Duration::from_secs(60));
362
363			while *pool.is_running.lock().await {
364				interval.tick().await;
365
366				let cleaned_count = pool.cleanup_stale_connections().await;
367				if cleaned_count > 0 {
368					debug!("[ConnectionPool] Cleaned {} stale connections", cleaned_count);
369				}
370			}
371		});
372	}
373
374	/// Initialize minimum connections
375	async fn initialize_min_connections(&self) {
376		let current_count = self.connections.lock().await.len();
377
378		if current_count < self.config.min_connections {
379			let needed = self.config.min_connections - current_count;
380
381			for _ in 0..needed {
382				let handle = ConnectionHandle::new();
383				let mut connections = self.connections.lock().await;
384				connections.insert(handle.id.clone(), handle);
385			}
386
387			debug!("[ConnectionPool] Initialized {} minimum connections", needed);
388		}
389	}
390
391	/// Check connection health
392	async fn check_connection_health(&self) -> Result<(), String> {
393		let mut connections = self.connections.lock().await;
394		let mut _health_checker = self.health_checker.lock().await;
395
396		let mut healthy_count = 0;
397
398		for (_id, handle) in connections.iter_mut() {
399			let is_healthy = _health_checker.check_connection_health(handle).await;
400			handle.update_health(is_healthy);
401
402			if handle.is_healthy() {
403				healthy_count += 1;
404			}
405		}
406
407		// Update statistics
408		{
409			let mut stats = self.stats.write().await;
410			stats.healthy_connections = healthy_count;
411			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
412
413			if stats.total_operations > 0 {
414				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
415			}
416		}
417
418		Ok(())
419	}
420
421	/// Cleanup stale connections
422	async fn cleanup_stale_connections(&self) -> usize {
423		let mut connections = self.connections.lock().await;
424		let _now = Instant::now();
425
426		let stale_ids:Vec<String> = connections
427			.iter()
428			.filter(|(_, handle)| {
429				handle.age().as_millis() > self.config.max_lifetime_ms as u128
430					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
431					|| !handle.is_healthy()
432			})
433			.map(|(id, _)| id.clone())
434			.collect();
435
436		for id in &stale_ids {
437			connections.remove(id);
438		}
439
440		// Update statistics
441		{
442			let mut stats = self.stats.write().await;
443			stats.total_connections = connections.len();
444			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
445		}
446
447		stale_ids.len()
448	}
449
450	/// Get pool statistics
451	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
452
453	/// Get active connection count
454	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
455
456	/// Get healthy connection count
457	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
458
459	/// Check if pool is running
460	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
461}
462
463impl Clone for ConnectionPool {
464	fn clone(&self) -> Self {
465		Self {
466			config:self.config.clone(),
467			connections:self.connections.clone(),
468			semaphore:self.semaphore.clone(),
469			wait_queue:self.wait_queue.clone(),
470			stats:self.stats.clone(),
471			health_checker:self.health_checker.clone(),
472			is_running:self.is_running.clone(),
473		}
474	}
475}
476
477/// Connection health checker
478struct ConnectionHealthChecker {
479	ping_timeout:Duration,
480}
481
482impl ConnectionHealthChecker {
483	fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
484
485	/// Check connection health
486	async fn check_connection_health(&self, handle:&mut ConnectionHandle) -> bool {
487		// Simulate health check by ensuring connection can handle basic operations
488		// In a real implementation, this would send an actual ping message
489		let start_time = Instant::now();
490
491		// Simulate network latency
492		tokio::time::sleep(Duration::from_millis(10)).await;
493
494		let response_time = start_time.elapsed();
495
496		// Connection is healthy if response time is reasonable
497		response_time < self.ping_timeout
498	}
499}
500
501/// Utility functions for connection pooling
502impl ConnectionPool {
503	/// Create a connection pool with default configuration
504	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
505
506	/// Create a high-performance pool
507	pub fn high_performance_pool() -> Self {
508		Self::new(PoolConfig {
509			max_connections:50,
510			min_connections:10,
511			connection_timeout_ms:10000,
512			// Shorter lifetimes for high-performance mode: 3 minutes max, 30 seconds idle.
513			max_lifetime_ms:180000,
514			idle_timeout_ms:30000,
515			// More frequent health checks: every 15 seconds.
516			health_check_interval_ms:15000,
517		})
518	}
519
520	/// Create a conservative pool
521	pub fn conservative_pool() -> Self {
522		Self::new(PoolConfig {
523			max_connections:5,
524			min_connections:1,
525			connection_timeout_ms:60000,
526			// Longer lifetimes for conservative mode: 10 minutes max, 2 minutes idle.
527			max_lifetime_ms:600000,
528			idle_timeout_ms:120000,
529			// Less frequent health checks: every 60 seconds.
530			health_check_interval_ms:60000,
531		})
532	}
533
534	/// Calculate optimal pool size based on system resources
535	pub fn calculate_optimal_pool_size() -> usize {
536		let num_cpus = num_cpus::get();
537		// Calculate optimal pool size using 2x CPU count as a starting point,
538		// with minimum of 4 and maximum of 50 connections.
539		(num_cpus * 2).max(4).min(50)
540	}
541}
542
543#[cfg(test)]
544mod tests {
545	use tokio::time::sleep;
546
547	use super::*;
548
549	#[tokio::test]
550	async fn test_connection_pool_creation() {
551		let pool = ConnectionPool::default_pool();
552		assert_eq!(pool.config.max_connections, 10);
553	}
554
555	#[tokio::test]
556	async fn test_connection_handle_health() {
557		let mut handle = ConnectionHandle::new();
558		assert!(handle.is_healthy());
559
560		// Test successful operation
561		handle.update_health(true);
562		assert!(handle.is_healthy());
563		assert_eq!(handle.success_rate(), 1.0);
564
565		// Test failed operation
566		handle.update_health(false);
567		// The connection should still be healthy after one failure (health score drops
568		// but stays above 50).
569		assert!(handle.is_healthy());
570		assert_eq!(handle.success_rate(), 0.5);
571	}
572
573	#[tokio::test]
574	async fn test_pool_lifecycle() {
575		let pool = ConnectionPool::default_pool();
576
577		// Start pool
578		pool.start().await.unwrap();
579		assert!(pool.is_running().await);
580
581		// Get connection
582		let handle = pool.get_connection().await.unwrap();
583		assert!(handle.is_healthy());
584
585		// Release connection
586		pool.release_connection(handle).await;
587
588		// Stop pool
589		pool.stop().await.unwrap();
590		assert!(!pool.is_running().await);
591	}
592
593	#[tokio::test]
594	async fn test_pool_statistics() {
595		let pool = ConnectionPool::default_pool();
596		pool.start().await.unwrap();
597
598		// Get some connections without await in sync closure
599		let mut handles = Vec::new();
600		for _ in 0..3 {
601			handles.push(pool.get_connection().await.unwrap());
602		}
603
604		let stats = pool.get_stats().await;
605		assert_eq!(stats.active_connections, 3);
606
607		// Release connections
608		for handle in handles {
609			pool.release_connection(handle).await;
610		}
611
612		let stats_after = pool.get_stats().await;
613		assert_eq!(stats_after.active_connections, 0);
614		assert_eq!(stats_after.idle_connections, 3);
615
616		pool.stop().await.unwrap();
617	}
618
619	#[tokio::test]
620	async fn test_connection_cleanup() {
621		let pool = ConnectionPool::new(PoolConfig {
622			// Very short lifetime (100ms) for testing cleanup behavior.
623			max_lifetime_ms:100,
624			// Very short idle timeout (50ms) for testing.
625			idle_timeout_ms:50,
626			..Default::default()
627		});
628
629		pool.start().await.unwrap();
630
631		// Get and release connection
632		let handle = pool.get_connection().await.unwrap();
633		pool.release_connection(handle).await;
634
635		// Wait for cleanup
636		sleep(Duration::from_millis(200)).await;
637
638		let cleaned_count = pool.cleanup_stale_connections().await;
639		assert!(cleaned_count > 0);
640
641		pool.stop().await.unwrap();
642	}
643
644	#[test]
645	fn test_optimal_pool_size_calculation() {
646		let optimal_size = ConnectionPool::calculate_optimal_pool_size();
647		assert!(optimal_size >= 4 && optimal_size <= 50);
648	}
649}