Mountain/IPC/Connection/
Manager.rs

1use std::{collections::HashMap, sync::Arc};
2
3use log::debug;
4use tokio::{
5	sync::{Mutex as AsyncMutex, Semaphore},
6	time::{Duration, timeout},
7};
8
9use super::{
10	Health::HealthChecker,
11	Types::{ConnectionHandle, ConnectionStats},
12};
13
14/// Connection manager (alias for ConnectionPool)
15///
16/// This is the main connection management structure, providing connection
17/// pooling with health monitoring and automatic cleanup.
18pub type ConnectionManager = ConnectionPool;
19
20/// Connection pool for IPC operations
21///
22/// This structure manages a pool of connections, preventing connection
23/// exhaustion by reusing connections and providing health monitoring.
24///
25/// ## Pool Architecture
26///
27/// ```text
28/// ConnectionPool
29///     |
30///     | Semaphore (limits max connections)
31///     v
32/// Active Connections (HashMap<id, ConnectionHandle>)
33///     |
34///     | Health Checker (background task)
35///     v
36/// Monitor health and update scores
37/// ```
38///
39/// ## Connection Lifecycle
40///
41/// 1. **Acquisition**: Get a connection from the pool (or create new)
42/// 2. **Usage**: Use the connection for operations
43/// 3. **Release**: Return the connection to the pool
44/// 4. **Cleanup**: Automatically remove stale/unhealthy connections
45///
46/// ## Health Monitoring
47///
48/// Each connection has:
49/// - Health score (0.0 to 100.0)
50/// - Error count
51/// - Last used timestamp
52/// - Background health checks every 30 seconds
53///
54/// ## Example Usage
55///
56/// ```rust,ignore
57/// let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
58///
59/// // Get a connection
60/// let handle = pool.GetConnection().await?;
61///
62/// // Use the connection...
63///
64/// // Release the connection
65/// pool.ReleaseConnection(handle).await;
66///
67/// // Get statistics
68/// let stats = pool.GetStats().await;
69/// ```
70pub struct ConnectionPool {
71	/// Maximum number of concurrent connections allowed
72	MaxConnections:usize,
73
74	/// Timeout for acquiring a connection from the pool
75	ConnectionTimeout:Duration,
76
77	/// Semaphore to limit concurrent connections
78	Semaphore:Arc<Semaphore>,
79
80	/// Map of active connections by ID
81	ActiveConnections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83	/// Health checker for monitoring connection health
84	HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88	/// Create a new connection pool with specified parameters
89	///
90	/// ## Parameters
91	/// - `MaxConnections`: Maximum number of concurrent connections
92	/// - `ConnectionTimeout`: Timeout for acquiring a connection
93	///
94	/// ## Example
95	///
96	/// ```rust,ignore
97	/// let pool = ConnectionPool::new(10, Duration::from_secs(30));
98	/// ```
99	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100		debug!(
101			"[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
102			MaxConnections, ConnectionTimeout
103		);
104
105		Self {
106			MaxConnections,
107			ConnectionTimeout,
108			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
109			ActiveConnections:Arc::new(AsyncMutex::new(HashMap::new())),
110			HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
111		}
112	}
113
114	/// Create a connection pool with default settings
115	///
116	/// Default settings: 10 max connections, 30s timeout
117	pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
118
119	/// Get a connection handle from the pool with timeout
120	///
121	/// This method acquires a semaphore permit and creates a new connection
122	/// handle. If the pool is at capacity, it will wait until a connection
123	/// becomes available or the timeout is reached.
124	///
125	/// ## Returns
126	/// - `Ok(ConnectionHandle)`: New connection handle
127	/// - `Err(String)`: Error message if timeout or failure occurs
128	///
129	/// ## Example
130	///
131	/// ```rust,ignore
132	/// let handle = pool.GetConnection().await?;
133	/// ```
134	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
135		debug!("[ConnectionPool] Acquiring connection permit");
136
137		// Acquire semaphore permit with timeout
138		let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
139			.await
140			.map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
141			.map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
142
143		// Create new connection handle
144		let handle = ConnectionHandle::new();
145
146		// Add to active connections
147		{
148			let mut connections = self.ActiveConnections.lock().await;
149			connections.insert(handle.id.clone(), handle.clone());
150		}
151
152		debug!("[ConnectionPool] Connection {} acquired (permit released on drop)", handle.id);
153
154		// Start health monitoring for this connection
155		self.StartHealthMonitoring(&handle.id).await;
156
157		// The permit will be automatically released when dropped
158		drop(permit);
159
160		Ok(handle)
161	}
162
163	/// Release a connection handle back to the pool
164	///
165	/// This method removes the connection from the active connections map,
166	/// allowing the semaphore permit to be reused.
167	///
168	/// ## Parameters
169	/// - `handle`: The connection handle to release
170	///
171	/// ## Example
172	///
173	/// ```rust,ignore
174	/// pool.ReleaseConnection(handle).await;
175	/// ```
176	pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
177		debug!("[ConnectionPool] Releasing connection {}", handle.id);
178
179		{
180			let mut connections = self.ActiveConnections.lock().await;
181			connections.remove(&handle.id);
182		}
183
184		debug!("[ConnectionPool] Connection {} released", handle.id);
185	}
186
187	/// Get connection statistics for monitoring
188	///
189	/// This method returns aggregate statistics about the connection pool,
190	/// useful for monitoring and debugging.
191	///
192	/// ## Returns
193	/// Connection statistics including total connections, healthy connections,
194	/// utilization, etc.
195	///
196	/// ## Example
197	///
198	/// ```rust,ignore
199	/// let stats = pool.GetStats().await;
200	/// println!("Pool stats: {:?}", stats.summary());
201	/// ```
202	pub async fn GetStats(&self) -> ConnectionStats {
203		let connections = self.ActiveConnections.lock().await;
204		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
205
206		ConnectionStats {
207			total_connections:connections.len(),
208			healthy_connections,
209			max_connections:self.MaxConnections,
210			available_permits:self.Semaphore.available_permits(),
211			connection_timeout:self.ConnectionTimeout,
212		}
213	}
214
215	/// Clean up stale connections
216	///
217	/// This method removes connections that have not been used recently
218	/// or are unhealthy, preventing memory leaks and resource exhaustion.
219	///
220	/// Stale connection criteria:
221	/// - Unused for 5 minutes (300 seconds)
222	/// - Not healthy (health score <= 50 or error count >= 5)
223	///
224	/// ## Returns
225	/// The number of stale connections removed
226	///
227	/// ## Example
228	///
229	/// ```rust,ignore
230	/// let cleaned = pool.CleanUpStaleConnections().await;
231	/// println!("Cleaned up {} stale connections", cleaned);
232	/// ```
233	pub async fn CleanUpStaleConnections(&self) -> usize {
234		let mut connections = self.ActiveConnections.lock().await;
235		let now = std::time::SystemTime::now();
236		let stale_threshold = Duration::from_secs(300); // 5 minutes
237
238		let stale_ids:Vec<String> = connections
239			.iter()
240			.filter(|(_, handle)| {
241				// Check if connection is stale using SystemTime
242				let is_stale_by_time = match now.duration_since(handle.last_used) {
243					Ok(idle_time) => idle_time > stale_threshold,
244					Err(_) => true, // If time went backwards, consider it stale
245				};
246				is_stale_by_time || !handle.is_healthy()
247			})
248			.map(|(id, _)| id.clone())
249			.collect();
250
251		let stale_count = stale_ids.len();
252		for id in stale_ids {
253			debug!("[ConnectionPool] Removing stale connection {}", id);
254			connections.remove(&id);
255		}
256
257		if stale_count > 0 {
258			debug!("[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
259		}
260
261		stale_count
262	}
263
264	/// Start health monitoring for a connection
265	///
266	/// This method spawns a background task that periodically checks the
267	/// health of the connection and updates its health score.
268	///
269	/// ## Parameters
270	/// - `connection_id`: The ID of the connection to monitor
271	async fn StartHealthMonitoring(&self, connection_id:&str) {
272		let health_checker = self.HealthChecker.clone();
273		let active_connections = self.ActiveConnections.clone();
274		let connection_id = connection_id.to_string();
275
276		tokio::spawn(async move {
277			let mut interval = tokio::time::interval(Duration::from_secs(30));
278
279			loop {
280				interval.tick().await;
281
282				let mut checker = health_checker.lock().await;
283				let mut connections = match active_connections.try_lock() {
284					Ok(conns) => conns,
285					Err(_) => continue,
286				};
287
288				if let Some(handle) = connections.get_mut(&connection_id) {
289					let is_healthy = checker.check_connection_health(handle).await;
290					handle.update_health(is_healthy);
291
292					if !handle.is_healthy() {
293						debug!(
294							"[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
295							handle.id, handle.health_score, handle.error_count
296						);
297					}
298				} else {
299					// Connection removed from pool, stop monitoring
300					debug!(
301						"[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
302						connection_id
303					);
304					break;
305				}
306			}
307		});
308	}
309
310	/// Get the maximum number of connections
311	pub fn max_connections(&self) -> usize { self.MaxConnections }
312
313	/// Get the connection timeout
314	pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
315
316	/// Get the number of available permits
317	pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
318
319	/// Get the number of active connections
320	pub async fn active_connections(&self) -> usize {
321		let connections = self.ActiveConnections.lock().await;
322		connections.len()
323	}
324}
325
326#[cfg(test)]
327mod tests {
328	use super::*;
329
330	#[tokio::test]
331	async fn test_connection_pool_creation() {
332		let pool = ConnectionPool::new(10, Duration::from_secs(30));
333		assert_eq!(pool.max_connections(), 10);
334		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
335		assert_eq!(pool.available_permits(), 10);
336		assert_eq!(pool.active_connections().await, 0);
337	}
338
339	#[tokio::test]
340	async fn test_default_connection_pool() {
341		let pool = ConnectionPool::default();
342		assert_eq!(pool.max_connections(), 10);
343		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
344	}
345
346	#[tokio::test]
347	async fn test_get_and_release_connection() {
348		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
349
350		// Get a connection
351		let handle = pool.GetConnection().await.unwrap();
352		assert_eq!(pool.active_connections().await, 1);
353		assert_eq!(pool.available_permits(), 4); // One permit used
354
355		// Release the connection
356		pool.ReleaseConnection(handle).await;
357		assert_eq!(pool.active_connections().await, 0);
358		assert_eq!(pool.available_permits(), 5); // Permit restored
359	}
360
361	#[tokio::test]
362	async fn test_multiple_connections() {
363		let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
364
365		// Collect handles properly without await in sync closure
366		let mut handles = Vec::new();
367		for _ in 0..3 {
368			handles.push(pool.GetConnection().await.unwrap());
369		}
370
371		assert_eq!(pool.active_connections().await, 3);
372		assert_eq!(pool.available_permits(), 0);
373
374		// Try to get one more - should timeout
375		let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
376		assert!(result.is_err()); // Timeout
377
378		// Release one connection
379		pool.ReleaseConnection(handles[0].clone()).await;
380		assert_eq!(pool.available_permits(), 1);
381
382		// Now we can get another
383		let handle = pool.GetConnection().await.unwrap();
384		assert_eq!(pool.available_permits(), 0);
385
386		// Release all
387		for handle in handles {
388			pool.ReleaseConnection(handle).await;
389		}
390		pool.ReleaseConnection(handle).await;
391	}
392
393	#[tokio::test]
394	async fn test_connection_stats() {
395		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
396
397		let stats = pool.GetStats().await;
398		assert_eq!(stats.total_connections, 0);
399		assert_eq!(stats.healthy_connections, 0);
400		assert_eq!(stats.max_connections, 5);
401		assert_eq!(stats.utilization(), 0.0);
402
403		// Add some connections
404		for _ in 0..3 {
405			let _ = pool.GetConnection().await.unwrap();
406		}
407
408		let stats = pool.GetStats().await;
409		assert_eq!(stats.total_connections, 3);
410		assert!(stats.healthy_connections > 0);
411		assert!(stats.utilization() > 0.0);
412	}
413
414	#[tokio::test]
415	async fn test_cleanup_stale_connections() {
416		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
417
418		// Create a connection and make it stale
419		let mut handle = pool.GetConnection().await.unwrap();
420
421		// Manually make it stale by setting old last_used and degrading health
422		unsafe {
423			let ptr = &mut handle as *mut ConnectionHandle;
424			// Set last_used to a time in the past for testing
425			(*ptr).last_used = std::time::SystemTime::now()
426				.checked_sub(Duration::from_secs(360))
427				.unwrap_or((*ptr).last_used);
428			(*ptr).health_score = 25.0; // Unhealthy
429		}
430
431		// Release and try to clean up
432		pool.ReleaseConnection(handle).await;
433
434		// Clean up (will have to adjust logic for testing or add a method to force
435		// cleanup) For now, we'll just verify the method exists and runs
436		let cleaned = pool.CleanUpStaleConnections().await;
437		assert!(cleaned >= 0);
438	}
439
440	#[tokio::test]
441	async fn test_pool_utilization() {
442		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
443
444		assert_eq!(pool.GetStats().await.utilization(), 0.0);
445
446		// Use half the connections
447		for _ in 0..5 {
448			let _ = pool.GetConnection().await.unwrap();
449		}
450
451		let stats = pool.GetStats().await;
452		assert_eq!(stats.utilization(), 50.0);
453	}
454}