1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use log::{debug, error, info, trace, warn};
13use serde::{Deserialize, Serialize};
14use tokio::{
15 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
16 time::{interval, timeout},
17};
18use uuid::Uuid;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PoolConfig {
23 pub max_connections:usize,
24 pub min_connections:usize,
25 pub connection_timeout_ms:u64,
26 pub max_lifetime_ms:u64,
27 pub idle_timeout_ms:u64,
28 pub health_check_interval_ms:u64,
29}
30
31impl Default for PoolConfig {
32 fn default() -> Self {
33 Self {
34 max_connections:10,
35 min_connections:2,
36 connection_timeout_ms:30000,
38 max_lifetime_ms:300000,
40 idle_timeout_ms:60000,
42 health_check_interval_ms:30000,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ConnectionHealth {
51 Healthy,
52 Unhealthy,
53 Degraded,
54}
55
56#[derive(Debug, Clone)]
58pub struct ConnectionHandle {
59 pub id:String,
60 pub created_at:Instant,
61 pub last_used:Instant,
62 pub health_score:f64,
63 pub error_count:usize,
64 pub successful_operations:usize,
65 pub total_operations:usize,
66 pub is_active:bool,
67 pub reuse_count:u32,
68 pub health:ConnectionHealth,
69}
70
71impl ConnectionHandle {
72 pub fn new() -> Self {
74 Self {
75 id:Uuid::new_v4().to_string(),
76 created_at:Instant::now(),
77 last_used:Instant::now(),
78 health_score:100.0,
79 error_count:0,
80 successful_operations:0,
81 total_operations:0,
82 is_active:true,
83 reuse_count:0,
84 health:ConnectionHealth::Healthy,
85 }
86 }
87
88 pub fn update_health(&mut self, success:bool) {
90 self.last_used = Instant::now();
91 self.total_operations += 1;
92
93 if success {
94 self.successful_operations += 1;
95 self.health_score = (self.health_score + 2.0).min(100.0);
97 self.error_count = 0;
98 } else {
99 self.error_count += 1;
100 self.health_score = (self.health_score - 10.0).max(0.0);
102 }
103
104 let success_rate = if self.total_operations > 0 {
106 self.successful_operations as f64 / self.total_operations as f64
107 } else {
108 1.0
109 };
110
111 self.health_score = (self.health_score * 0.7 + success_rate * 100.0 * 0.3).max(0.0).min(100.0);
113 }
114
115 pub fn is_healthy(&self) -> bool {
117 self.health_score > 50.0 && self.error_count < 5 && self.is_active && self.age().as_secs() < 300
121 }
122
123 pub fn age(&self) -> Duration { self.created_at.elapsed() }
125
126 pub fn idle_time(&self) -> Duration { self.last_used.elapsed() }
128
129 pub fn success_rate(&self) -> f64 {
131 if self.total_operations == 0 {
132 1.0
133 } else {
134 self.successful_operations as f64 / self.total_operations as f64
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PoolStats {
142 pub total_connections:usize,
143 pub active_connections:usize,
144 pub idle_connections:usize,
145 pub healthy_connections:usize,
146 pub max_connections:usize,
147 pub min_connections:usize,
148 pub wait_queue_size:usize,
149 pub average_wait_time_ms:f64,
150 pub total_operations:u64,
151 pub successful_operations:u64,
152 pub error_rate:f64,
153}
154
155pub struct ConnectionPool {
157 pub config:PoolConfig,
158 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
159 pub semaphore:Arc<Semaphore>,
160 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
161 pub stats:Arc<RwLock<PoolStats>>,
162 pub health_checker:Arc<AsyncMutex<ConnectionHealthChecker>>,
163 pub is_running:Arc<AsyncMutex<bool>>,
164}
165
166impl ConnectionPool {
167 pub fn new(config:PoolConfig) -> Self {
169 let max_connections = config.max_connections;
170 let min_connections = config.min_connections;
171
172 let pool = Self {
173 config:config.clone(),
174 connections:Arc::new(AsyncMutex::new(HashMap::new())),
175 semaphore:Arc::new(Semaphore::new(max_connections)),
176 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
177 stats:Arc::new(RwLock::new(PoolStats {
178 total_connections:0,
179 active_connections:0,
180 idle_connections:0,
181 healthy_connections:0,
182 max_connections,
183 min_connections,
184 wait_queue_size:0,
185 average_wait_time_ms:0.0,
186 total_operations:0,
187 successful_operations:0,
188 error_rate:0.0,
189 })),
190 health_checker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
191 is_running:Arc::new(AsyncMutex::new(false)),
192 };
193
194 info!("[ConnectionPool] Created pool with max {} connections", max_connections);
195 pool
196 }
197
198 pub async fn start(&self) -> Result<(), String> {
200 {
201 let mut running = self.is_running.lock().await;
202 if *running {
203 return Ok(());
205 }
206 *running = true;
207 }
208
209 self.start_health_monitoring().await;
211
212 self.start_connection_cleanup().await;
214
215 self.initialize_min_connections().await;
217
218 info!("[ConnectionPool] Started connection pool");
219 Ok(())
220 }
221
222 pub async fn stop(&self) -> Result<(), String> {
224 {
225 let mut running = self.is_running.lock().await;
226 if !*running {
227 return Ok(());
229 }
230 *running = false;
231 }
232
233 {
235 let mut connections = self.connections.lock().await;
236 connections.clear();
237 }
238
239 {
241 let mut wait_queue = self.wait_queue.lock().await;
242 for notifier in wait_queue.drain(..) {
243 notifier.notify_one();
244 }
245 }
246
247 info!("[ConnectionPool] Stopped connection pool");
248 Ok(())
249 }
250
251 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
253 let start_time = Instant::now();
254
255 let _permit = timeout(
257 Duration::from_millis(self.config.connection_timeout_ms),
258 self.semaphore.acquire(),
259 )
260 .await
261 .map_err(|_| "Connection timeout".to_string())?
262 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
263
264 let wait_time = start_time.elapsed().as_millis() as f64;
265
266 {
268 let mut stats = self.stats.write().await;
269 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
270 / (stats.total_operations as f64 + 1.0);
271 }
272
273 let connection = self.find_or_create_connection().await?;
275
276 {
278 let mut stats = self.stats.write().await;
279 stats.active_connections += 1;
280 stats.total_operations += 1;
281 }
282
283 trace!("[ConnectionPool] Connection acquired: {}", connection.id);
284 Ok(connection)
285 }
286
287 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
289 let connection_id = handle.id.clone();
290
291 handle.last_used = Instant::now();
292
293 {
295 let mut connections = self.connections.lock().await;
296 connections.insert(handle.id.clone(), handle.clone());
297 }
298
299 {
301 let mut stats = self.stats.write().await;
302 stats.active_connections = stats.active_connections.saturating_sub(1);
303 stats.idle_connections += 1;
304 }
305
306 drop(handle);
308
309 trace!("[ConnectionPool] Connection released: {}", connection_id);
310 }
311
312 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
314 let mut connections = self.connections.lock().await;
315
316 for (_id, handle) in connections.iter_mut() {
318 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
319 handle.last_used = Instant::now();
320 return Ok(handle.clone());
321 }
322 }
323
324 let new_handle = ConnectionHandle::new();
326 connections.insert(new_handle.id.clone(), new_handle.clone());
327
328 {
330 let mut stats = self.stats.write().await;
331 stats.total_connections += 1;
332 stats.healthy_connections += 1;
333 }
334
335 Ok(new_handle)
336 }
337
338 async fn start_health_monitoring(&self) {
340 let pool = Arc::new(self.clone());
341
342 tokio::spawn(async move {
343 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
344
345 while *pool.is_running.lock().await {
346 interval.tick().await;
347
348 if let Err(e) = pool.check_connection_health().await {
349 error!("[ConnectionPool] Health check failed: {}", e);
350 }
351 }
352 });
353 }
354
355 async fn start_connection_cleanup(&self) {
357 let pool = Arc::new(self.clone());
358
359 tokio::spawn(async move {
360 let mut interval = interval(Duration::from_secs(60));
362
363 while *pool.is_running.lock().await {
364 interval.tick().await;
365
366 let cleaned_count = pool.cleanup_stale_connections().await;
367 if cleaned_count > 0 {
368 debug!("[ConnectionPool] Cleaned {} stale connections", cleaned_count);
369 }
370 }
371 });
372 }
373
374 async fn initialize_min_connections(&self) {
376 let current_count = self.connections.lock().await.len();
377
378 if current_count < self.config.min_connections {
379 let needed = self.config.min_connections - current_count;
380
381 for _ in 0..needed {
382 let handle = ConnectionHandle::new();
383 let mut connections = self.connections.lock().await;
384 connections.insert(handle.id.clone(), handle);
385 }
386
387 debug!("[ConnectionPool] Initialized {} minimum connections", needed);
388 }
389 }
390
391 async fn check_connection_health(&self) -> Result<(), String> {
393 let mut connections = self.connections.lock().await;
394 let mut _health_checker = self.health_checker.lock().await;
395
396 let mut healthy_count = 0;
397
398 for (_id, handle) in connections.iter_mut() {
399 let is_healthy = _health_checker.check_connection_health(handle).await;
400 handle.update_health(is_healthy);
401
402 if handle.is_healthy() {
403 healthy_count += 1;
404 }
405 }
406
407 {
409 let mut stats = self.stats.write().await;
410 stats.healthy_connections = healthy_count;
411 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
412
413 if stats.total_operations > 0 {
414 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
415 }
416 }
417
418 Ok(())
419 }
420
421 async fn cleanup_stale_connections(&self) -> usize {
423 let mut connections = self.connections.lock().await;
424 let _now = Instant::now();
425
426 let stale_ids:Vec<String> = connections
427 .iter()
428 .filter(|(_, handle)| {
429 handle.age().as_millis() > self.config.max_lifetime_ms as u128
430 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
431 || !handle.is_healthy()
432 })
433 .map(|(id, _)| id.clone())
434 .collect();
435
436 for id in &stale_ids {
437 connections.remove(id);
438 }
439
440 {
442 let mut stats = self.stats.write().await;
443 stats.total_connections = connections.len();
444 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
445 }
446
447 stale_ids.len()
448 }
449
450 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
452
453 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
455
456 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
458
459 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
461}
462
463impl Clone for ConnectionPool {
464 fn clone(&self) -> Self {
465 Self {
466 config:self.config.clone(),
467 connections:self.connections.clone(),
468 semaphore:self.semaphore.clone(),
469 wait_queue:self.wait_queue.clone(),
470 stats:self.stats.clone(),
471 health_checker:self.health_checker.clone(),
472 is_running:self.is_running.clone(),
473 }
474 }
475}
476
477struct ConnectionHealthChecker {
479 ping_timeout:Duration,
480}
481
482impl ConnectionHealthChecker {
483 fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
484
485 async fn check_connection_health(&self, handle:&mut ConnectionHandle) -> bool {
487 let start_time = Instant::now();
490
491 tokio::time::sleep(Duration::from_millis(10)).await;
493
494 let response_time = start_time.elapsed();
495
496 response_time < self.ping_timeout
498 }
499}
500
501impl ConnectionPool {
503 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
505
506 pub fn high_performance_pool() -> Self {
508 Self::new(PoolConfig {
509 max_connections:50,
510 min_connections:10,
511 connection_timeout_ms:10000,
512 max_lifetime_ms:180000,
514 idle_timeout_ms:30000,
515 health_check_interval_ms:15000,
517 })
518 }
519
520 pub fn conservative_pool() -> Self {
522 Self::new(PoolConfig {
523 max_connections:5,
524 min_connections:1,
525 connection_timeout_ms:60000,
526 max_lifetime_ms:600000,
528 idle_timeout_ms:120000,
529 health_check_interval_ms:60000,
531 })
532 }
533
534 pub fn calculate_optimal_pool_size() -> usize {
536 let num_cpus = num_cpus::get();
537 (num_cpus * 2).max(4).min(50)
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use tokio::time::sleep;
546
547 use super::*;
548
549 #[tokio::test]
550 async fn test_connection_pool_creation() {
551 let pool = ConnectionPool::default_pool();
552 assert_eq!(pool.config.max_connections, 10);
553 }
554
555 #[tokio::test]
556 async fn test_connection_handle_health() {
557 let mut handle = ConnectionHandle::new();
558 assert!(handle.is_healthy());
559
560 handle.update_health(true);
562 assert!(handle.is_healthy());
563 assert_eq!(handle.success_rate(), 1.0);
564
565 handle.update_health(false);
567 assert!(handle.is_healthy());
570 assert_eq!(handle.success_rate(), 0.5);
571 }
572
573 #[tokio::test]
574 async fn test_pool_lifecycle() {
575 let pool = ConnectionPool::default_pool();
576
577 pool.start().await.unwrap();
579 assert!(pool.is_running().await);
580
581 let handle = pool.get_connection().await.unwrap();
583 assert!(handle.is_healthy());
584
585 pool.release_connection(handle).await;
587
588 pool.stop().await.unwrap();
590 assert!(!pool.is_running().await);
591 }
592
593 #[tokio::test]
594 async fn test_pool_statistics() {
595 let pool = ConnectionPool::default_pool();
596 pool.start().await.unwrap();
597
598 let mut handles = Vec::new();
600 for _ in 0..3 {
601 handles.push(pool.get_connection().await.unwrap());
602 }
603
604 let stats = pool.get_stats().await;
605 assert_eq!(stats.active_connections, 3);
606
607 for handle in handles {
609 pool.release_connection(handle).await;
610 }
611
612 let stats_after = pool.get_stats().await;
613 assert_eq!(stats_after.active_connections, 0);
614 assert_eq!(stats_after.idle_connections, 3);
615
616 pool.stop().await.unwrap();
617 }
618
619 #[tokio::test]
620 async fn test_connection_cleanup() {
621 let pool = ConnectionPool::new(PoolConfig {
622 max_lifetime_ms:100,
624 idle_timeout_ms:50,
626 ..Default::default()
627 });
628
629 pool.start().await.unwrap();
630
631 let handle = pool.get_connection().await.unwrap();
633 pool.release_connection(handle).await;
634
635 sleep(Duration::from_millis(200)).await;
637
638 let cleaned_count = pool.cleanup_stale_connections().await;
639 assert!(cleaned_count > 0);
640
641 pool.stop().await.unwrap();
642 }
643
644 #[test]
645 fn test_optimal_pool_size_calculation() {
646 let optimal_size = ConnectionPool::calculate_optimal_pool_size();
647 assert!(optimal_size >= 4 && optimal_size <= 50);
648 }
649}