Skip to main content

Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs

1
2//! `Pool::Struct` - bounded connection pool with health
3//! monitoring, idle/lifetime cleanup, wait-queue timeouts, and
4//! statistics. Acquire via `get_connection` (drops a permit on
5//! the inner `Semaphore`); return via `release_connection`.
6//! The struct + 18-method impl + Clone + tests stay in one
7//! file - tightly coupled cluster.
8
9use std::{
10	collections::HashMap,
11	sync::Arc,
12	time::{Duration, Instant},
13};
14
15use tokio::{
16	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
17	time::{interval, timeout},
18};
19
20use crate::{
21	IPC::Enhanced::ConnectionPool::{
22		ConnectionHandle::Struct as ConnectionHandle,
23		HealthChecker::Struct as HealthChecker,
24		PoolConfig::Struct as PoolConfig,
25		PoolStats::Struct as PoolStats,
26	},
27	dev_log,
28};
29
30pub struct Struct {
31	pub config:PoolConfig,
32
33	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
34
35	pub semaphore:Arc<Semaphore>,
36
37	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
38
39	pub stats:Arc<RwLock<PoolStats>>,
40
41	pub health_checker:Arc<AsyncMutex<HealthChecker>>,
42
43	pub is_running:Arc<AsyncMutex<bool>>,
44}
45
46impl Struct {
47	pub fn new(config:PoolConfig) -> Self {
48		let max_connections = config.max_connections;
49
50		let min_connections = config.min_connections;
51
52		let pool = Self {
53			config:config.clone(),
54
55			connections:Arc::new(AsyncMutex::new(HashMap::new())),
56
57			semaphore:Arc::new(Semaphore::new(max_connections)),
58
59			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
60
61			stats:Arc::new(RwLock::new(PoolStats {
62				total_connections:0,
63				active_connections:0,
64				idle_connections:0,
65				healthy_connections:0,
66				max_connections,
67				min_connections,
68				wait_queue_size:0,
69				average_wait_time_ms:0.0,
70				total_operations:0,
71				successful_operations:0,
72				error_rate:0.0,
73			})),
74
75			health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
76
77			is_running:Arc::new(AsyncMutex::new(false)),
78		};
79
80		dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
81
82		pool
83	}
84
85	pub async fn start(&self) -> Result<(), String> {
86		{
87			let mut running = self.is_running.lock().await;
88
89			if *running {
90				return Ok(());
91			}
92
93			*running = true;
94		}
95
96		self.start_health_monitoring().await;
97
98		self.start_connection_cleanup().await;
99
100		self.initialize_min_connections().await;
101
102		dev_log!("ipc", "[ConnectionPool] Started connection pool");
103
104		Ok(())
105	}
106
107	pub async fn stop(&self) -> Result<(), String> {
108		{
109			let mut running = self.is_running.lock().await;
110
111			if !*running {
112				return Ok(());
113			}
114
115			*running = false;
116		}
117
118		{
119			let mut connections = self.connections.lock().await;
120
121			connections.clear();
122		}
123
124		{
125			let mut wait_queue = self.wait_queue.lock().await;
126
127			for notifier in wait_queue.drain(..) {
128				notifier.notify_one();
129			}
130		}
131
132		dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
133
134		Ok(())
135	}
136
137	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
138		let start_time = Instant::now();
139
140		let _permit = timeout(
141			Duration::from_millis(self.config.connection_timeout_ms),
142			self.semaphore.acquire(),
143		)
144		.await
145		.map_err(|_| "Connection timeout".to_string())?
146		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
147
148		let wait_time = start_time.elapsed().as_millis() as f64;
149
150		{
151			let mut stats = self.stats.write().await;
152
153			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
154				/ (stats.total_operations as f64 + 1.0);
155		}
156
157		let connection = self.find_or_create_connection().await?;
158
159		{
160			let mut stats = self.stats.write().await;
161
162			stats.active_connections += 1;
163
164			stats.total_operations += 1;
165		}
166
167		dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
168
169		Ok(connection)
170	}
171
172	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
173		let connection_id = handle.id.clone();
174
175		handle.last_used = Instant::now();
176
177		{
178			let mut connections = self.connections.lock().await;
179
180			connections.insert(handle.id.clone(), handle.clone());
181		}
182
183		{
184			let mut stats = self.stats.write().await;
185
186			stats.active_connections = stats.active_connections.saturating_sub(1);
187
188			stats.idle_connections += 1;
189		}
190
191		drop(handle);
192
193		dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
194	}
195
196	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
197		let mut connections = self.connections.lock().await;
198
199		for (_id, handle) in connections.iter_mut() {
200			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
201				handle.last_used = Instant::now();
202
203				return Ok(handle.clone());
204			}
205		}
206
207		let new_handle = ConnectionHandle::new();
208
209		connections.insert(new_handle.id.clone(), new_handle.clone());
210
211		{
212			let mut stats = self.stats.write().await;
213
214			stats.total_connections += 1;
215
216			stats.healthy_connections += 1;
217		}
218
219		Ok(new_handle)
220	}
221
222	async fn start_health_monitoring(&self) {
223		let pool = Arc::new(self.clone());
224
225		tokio::spawn(async move {
226			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
227
228			while *pool.is_running.lock().await {
229				interval.tick().await;
230
231				if let Err(e) = pool.check_connection_health().await {
232					dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
233				}
234			}
235		});
236	}
237
238	async fn start_connection_cleanup(&self) {
239		let pool = Arc::new(self.clone());
240
241		tokio::spawn(async move {
242			let mut interval = interval(Duration::from_secs(60));
243
244			while *pool.is_running.lock().await {
245				interval.tick().await;
246
247				let cleaned_count = pool.cleanup_stale_connections().await;
248				if cleaned_count > 0 {
249					dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
250				}
251			}
252		});
253	}
254
255	async fn initialize_min_connections(&self) {
256		let current_count = self.connections.lock().await.len();
257
258		if current_count < self.config.min_connections {
259			let needed = self.config.min_connections - current_count;
260
261			for _ in 0..needed {
262				let handle = ConnectionHandle::new();
263
264				let mut connections = self.connections.lock().await;
265
266				connections.insert(handle.id.clone(), handle);
267			}
268
269			dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
270		}
271	}
272
273	async fn check_connection_health(&self) -> Result<(), String> {
274		let mut connections = self.connections.lock().await;
275
276		let mut _health_checker = self.health_checker.lock().await;
277
278		let mut healthy_count = 0;
279
280		for (_id, handle) in connections.iter_mut() {
281			let is_healthy = _health_checker.check_connection_health(handle).await;
282
283			handle.update_health(is_healthy);
284
285			if handle.is_healthy() {
286				healthy_count += 1;
287			}
288		}
289
290		{
291			let mut stats = self.stats.write().await;
292
293			stats.healthy_connections = healthy_count;
294
295			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
296
297			if stats.total_operations > 0 {
298				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
299			}
300		}
301
302		Ok(())
303	}
304
305	pub async fn cleanup_stale_connections(&self) -> usize {
306		let mut connections = self.connections.lock().await;
307
308		let stale_ids:Vec<String> = connections
309			.iter()
310			.filter(|(_, handle)| {
311				handle.age().as_millis() > self.config.max_lifetime_ms as u128
312					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
313					|| !handle.is_healthy()
314			})
315			.map(|(id, _)| id.clone())
316			.collect();
317
318		for id in &stale_ids {
319			connections.remove(id);
320		}
321
322		{
323			let mut stats = self.stats.write().await;
324
325			stats.total_connections = connections.len();
326
327			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
328		}
329
330		stale_ids.len()
331	}
332
333	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
334
335	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
336
337	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
338
339	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
340
341	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
342
343	pub fn high_performance_pool() -> Self {
344		Self::new(PoolConfig {
345			max_connections:50,
346			min_connections:10,
347			connection_timeout_ms:10000,
348			max_lifetime_ms:180000,
349			idle_timeout_ms:30000,
350			health_check_interval_ms:15000,
351		})
352	}
353
354	pub fn conservative_pool() -> Self {
355		Self::new(PoolConfig {
356			max_connections:5,
357			min_connections:1,
358			connection_timeout_ms:60000,
359			max_lifetime_ms:600000,
360			idle_timeout_ms:120000,
361			health_check_interval_ms:60000,
362		})
363	}
364
365	pub fn calculate_optimal_pool_size() -> usize {
366		let num_cpus = num_cpus::get();
367
368		(num_cpus * 2).max(4).min(50)
369	}
370}
371
372impl Clone for Struct {
373	fn clone(&self) -> Self {
374		Self {
375			config:self.config.clone(),
376
377			connections:self.connections.clone(),
378
379			semaphore:self.semaphore.clone(),
380
381			wait_queue:self.wait_queue.clone(),
382
383			stats:self.stats.clone(),
384
385			health_checker:self.health_checker.clone(),
386
387			is_running:self.is_running.clone(),
388		}
389	}
390}