Skip to main content

Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs

1
2//! `PerformanceDashboard` aggregator + 25-method impl. Holds
3//! the metric ring-buffer, trace store, alert ring, statistics
4//! cell, and the `is_running` lifecycle flag. Method bodies
5//! tightly couple with the sibling DTOs so the impl stays a
6//! single file (per the "tightly-coupled cluster" exception).
7
8use std::{
9	collections::{HashMap, VecDeque},
10	sync::Arc,
11	time::{Duration, SystemTime},
12};
13
14use tokio::{
15	sync::{Mutex as AsyncMutex, RwLock},
16	time::interval,
17};
18
19use crate::{
20	IPC::Enhanced::PerformanceDashboard::{
21		AlertSeverity::Enum as AlertSeverity,
22		DashboardConfig::Struct as DashboardConfig,
23		DashboardStatistics::Struct as DashboardStatistics,
24		LogLevel::Enum as LogLevel,
25		MetricType::Enum as MetricType,
26		PerformanceAlert::Struct as PerformanceAlert,
27		PerformanceMetric::Struct as PerformanceMetric,
28		TraceLog::Struct as TraceLog,
29		TraceSpan::Struct as TraceSpan,
30	},
31	dev_log,
32};
33
34pub struct Struct {
35	pub(super) config:DashboardConfig,
36
37	pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
38
39	pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
40
41	pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
42
43	pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
44
45	pub(super) is_running:Arc<AsyncMutex<bool>>,
46}
47
48impl Struct {
49	pub fn new(config:DashboardConfig) -> Self {
50		let config_clone = config.clone();
51
52		let dashboard = Self {
53			config,
54
55			metrics:Arc::new(RwLock::new(VecDeque::new())),
56
57			traces:Arc::new(RwLock::new(HashMap::new())),
58
59			alerts:Arc::new(RwLock::new(VecDeque::new())),
60
61			statistics:Arc::new(RwLock::new(DashboardStatistics {
62				total_metrics_collected:0,
63				total_traces_collected:0,
64				total_alerts_triggered:0,
65				average_processing_time_ms:0.0,
66				peak_processing_time_ms:0,
67				error_rate_percentage:0.0,
68				throughput_messages_per_second:0.0,
69				memory_usage_mb:0.0,
70				last_update:SystemTime::now()
71					.duration_since(SystemTime::UNIX_EPOCH)
72					.unwrap_or_default()
73					.as_secs(),
74			})),
75
76			is_running:Arc::new(AsyncMutex::new(false)),
77		};
78
79		dev_log!(
80			"ipc",
81			"[PerformanceDashboard] Created dashboard with {}ms update interval",
82			config_clone.update_interval_ms
83		);
84
85		dashboard
86	}
87
88	pub async fn start(&self) -> Result<(), String> {
89		{
90			let mut running = self.is_running.lock().await;
91
92			if *running {
93				return Ok(());
94			}
95
96			*running = true;
97		}
98
99		self.start_metrics_collection().await;
100
101		self.start_alert_monitoring().await;
102
103		self.start_data_cleanup().await;
104
105		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
106
107		Ok(())
108	}
109
110	pub async fn stop(&self) -> Result<(), String> {
111		{
112			let mut running = self.is_running.lock().await;
113
114			if !*running {
115				return Ok(());
116			}
117
118			*running = false;
119		}
120
121		{
122			let mut metrics = self.metrics.write().await;
123
124			metrics.clear();
125		}
126
127		{
128			let mut traces = self.traces.write().await;
129
130			traces.clear();
131		}
132
133		{
134			let mut alerts = self.alerts.write().await;
135
136			alerts.clear();
137		}
138
139		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
140
141		Ok(())
142	}
143
144	pub async fn record_metric(&self, metric:PerformanceMetric) {
145		let mut metrics = self.metrics.write().await;
146
147		metrics.push_back(metric.clone());
148
149		drop(metrics);
150
151		self.update_statistics().await;
152
153		self.check_alerts(&metric).await;
154
155		dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
156	}
157
158	pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
159		let trace_id = Self::generate_trace_id();
160
161		let span_id = Self::generate_span_id();
162
163		let span = TraceSpan {
164			trace_id:trace_id.clone(),
165
166			span_id:span_id.clone(),
167
168			parent_span_id:None,
169
170			operation_name,
171
172			start_time:SystemTime::now()
173				.duration_since(SystemTime::UNIX_EPOCH)
174				.unwrap_or_default()
175				.as_millis() as u64,
176
177			end_time:None,
178
179			duration_ms:None,
180
181			tags:HashMap::new(),
182
183			logs:Vec::new(),
184		};
185
186		{
187			let mut traces = self.traces.write().await;
188
189			traces.insert(span_id.clone(), span.clone());
190		}
191
192		{
193			let mut stats = self.statistics.write().await;
194
195			stats.total_traces_collected += 1;
196		}
197
198		span
199	}
200
201	pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
202		let mut traces = self.traces.write().await;
203
204		if let Some(span) = traces.get_mut(span_id) {
205			let end_time = SystemTime::now()
206				.duration_since(SystemTime::UNIX_EPOCH)
207				.unwrap_or_default()
208				.as_millis() as u64;
209
210			span.end_time = Some(end_time);
211
212			span.duration_ms = Some(end_time.saturating_sub(span.start_time));
213
214			dev_log!(
215				"ipc",
216				"[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
217				span.operation_name,
218				span.duration_ms.unwrap_or(0)
219			);
220
221			Ok(())
222		} else {
223			Err(format!("Trace span not found: {}", span_id))
224		}
225	}
226
227	pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
228		let mut traces = self.traces.write().await;
229
230		if let Some(span) = traces.get_mut(span_id) {
231			span.logs.push(log);
232
233			Ok(())
234		} else {
235			Err(format!("Trace span not found: {}", span_id))
236		}
237	}
238
239	async fn start_metrics_collection(&self) {
240		let dashboard = Arc::new(self.clone());
241
242		tokio::spawn(async move {
243			let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
244
245			while *dashboard.is_running.lock().await {
246				interval.tick().await;
247				dashboard.collect_system_metrics().await;
248				dashboard.update_statistics().await;
249			}
250		});
251	}
252
253	async fn start_alert_monitoring(&self) {
254		let dashboard = Arc::new(self.clone());
255
256		tokio::spawn(async move {
257			let mut interval = interval(Duration::from_secs(10));
258
259			while *dashboard.is_running.lock().await {
260				interval.tick().await;
261				dashboard.check_performance_alerts().await;
262			}
263		});
264	}
265
266	async fn start_data_cleanup(&self) {
267		let dashboard = Arc::new(self.clone());
268
269		tokio::spawn(async move {
270			let mut interval = interval(Duration::from_secs(3600));
271
272			while *dashboard.is_running.lock().await {
273				interval.tick().await;
274				dashboard.cleanup_old_data().await;
275			}
276		});
277	}
278
279	async fn collect_system_metrics(&self) {
280		if let Ok(memory_usage) = Self::get_memory_usage() {
281			let metric = PerformanceMetric {
282				metric_type:MetricType::MemoryUsage,
283
284				value:memory_usage,
285
286				timestamp:SystemTime::now()
287					.duration_since(SystemTime::UNIX_EPOCH)
288					.unwrap_or_default()
289					.as_millis() as u64,
290
291				channel:None,
292
293				tags:HashMap::new(),
294			};
295
296			self.record_metric(metric).await;
297		}
298
299		if let Ok(cpu_usage) = Self::get_cpu_usage() {
300			let metric = PerformanceMetric {
301				metric_type:MetricType::CpuUsage,
302
303				value:cpu_usage,
304
305				timestamp:SystemTime::now()
306					.duration_since(SystemTime::UNIX_EPOCH)
307					.unwrap_or_default()
308					.as_millis() as u64,
309
310				channel:None,
311
312				tags:HashMap::new(),
313			};
314
315			self.record_metric(metric).await;
316		}
317	}
318
319	async fn update_statistics(&self) {
320		let metrics = self.metrics.read().await;
321
322		let mut stats = self.statistics.write().await;
323
324		let processing_metrics:Vec<&PerformanceMetric> = metrics
325			.iter()
326			.filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
327			.collect();
328
329		if !processing_metrics.is_empty() {
330			let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
331
332			stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
333
334			stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
335		}
336
337		let error_metrics:Vec<&PerformanceMetric> = metrics
338			.iter()
339			.filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
340			.collect();
341
342		if !error_metrics.is_empty() {
343			let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
344
345			stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
346		}
347
348		let throughput_metrics:Vec<&PerformanceMetric> = metrics
349			.iter()
350			.filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
351			.collect();
352
353		if !throughput_metrics.is_empty() {
354			let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
355
356			stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
357		}
358
359		let memory_metrics:Vec<&PerformanceMetric> = metrics
360			.iter()
361			.filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
362			.collect();
363
364		if !memory_metrics.is_empty() {
365			let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
366
367			stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
368		}
369
370		stats.last_update = SystemTime::now()
371			.duration_since(SystemTime::UNIX_EPOCH)
372			.unwrap_or_default()
373			.as_secs();
374	}
375
376	async fn check_alerts(&self, metric:&PerformanceMetric) {
377		let threshold = match metric.metric_type {
378			MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
379
380			MetricType::ErrorRate => 5.0,
381
382			MetricType::MemoryUsage => 1024.0,
383
384			MetricType::CpuUsage => 90.0,
385
386			_ => return,
387		};
388
389		if metric.value > threshold {
390			let severity = match metric.value / threshold {
391				ratio if ratio > 5.0 => AlertSeverity::Critical,
392
393				ratio if ratio > 3.0 => AlertSeverity::High,
394
395				ratio if ratio > 2.0 => AlertSeverity::Medium,
396
397				_ => AlertSeverity::Low,
398			};
399
400			let alert = PerformanceAlert {
401				alert_id:Self::generate_alert_id(),
402
403				metric_type:metric.metric_type.clone(),
404
405				threshold,
406
407				current_value:metric.value,
408
409				timestamp:metric.timestamp,
410
411				channel:metric.channel.clone(),
412
413				severity,
414
415				message:format!(
416					"{} exceeded threshold: {} > {}",
417					Self::metric_type_name(&metric.metric_type),
418					metric.value,
419					threshold
420				),
421			};
422
423			{
424				let mut alerts = self.alerts.write().await;
425
426				alerts.push_back(alert.clone());
427			}
428
429			{
430				let mut stats = self.statistics.write().await;
431
432				stats.total_alerts_triggered += 1;
433			}
434
435			dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
436		}
437	}
438
439	async fn check_performance_alerts(&self) {
440		dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
441	}
442
443	async fn cleanup_old_data(&self) {
444		let retention_threshold = SystemTime::now()
445			.duration_since(SystemTime::UNIX_EPOCH)
446			.unwrap_or_default()
447			.as_secs()
448			- (self.config.metrics_retention_hours * 3600);
449
450		{
451			let mut metrics = self.metrics.write().await;
452
453			metrics.retain(|m| m.timestamp >= retention_threshold);
454		}
455
456		{
457			let mut traces = self.traces.write().await;
458
459			traces.retain(|_, span| span.start_time >= retention_threshold);
460
461			if traces.len() > self.config.max_traces_stored {
462				let excess = traces.len() - self.config.max_traces_stored;
463
464				let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
465
466				for key in keys_to_remove {
467					traces.remove(&key);
468				}
469			}
470		}
471
472		{
473			let mut alerts = self.alerts.write().await;
474
475			alerts.retain(|a| a.timestamp >= retention_threshold);
476		}
477
478		dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
479	}
480
481	fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
482
483	fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
484
485	fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
486
487	fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
488
489	fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
490
491	fn metric_type_name(metric_type:&MetricType) -> &'static str {
492		match metric_type {
493			MetricType::MessageProcessingTime => "Message Processing Time",
494
495			MetricType::ConnectionLatency => "Connection Latency",
496
497			MetricType::MemoryUsage => "Memory Usage",
498
499			MetricType::CpuUsage => "CPU Usage",
500
501			MetricType::NetworkThroughput => "Network Throughput",
502
503			MetricType::ErrorRate => "Error Rate",
504
505			MetricType::QueueSize => "Queue Size",
506		}
507	}
508
509	pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
510
511	pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
512		let metrics = self.metrics.read().await;
513
514		metrics.iter().rev().take(limit).cloned().collect()
515	}
516
517	pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
518		let alerts = self.alerts.read().await;
519
520		alerts.iter().rev().cloned().collect()
521	}
522
523	pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
524		let traces = self.traces.read().await;
525
526		traces.values().find(|span| span.trace_id == trace_id).cloned()
527	}
528
529	pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
530
531	pub fn high_frequency_dashboard() -> Self {
532		Self::new(DashboardConfig {
533			update_interval_ms:1000,
534			metrics_retention_hours:1,
535			alert_threshold_ms:500,
536			trace_sampling_rate:1.0,
537			max_traces_stored:5000,
538		})
539	}
540
541	pub fn create_metric(
542		metric_type:MetricType,
543
544		value:f64,
545
546		channel:Option<String>,
547
548		tags:HashMap<String, String>,
549	) -> PerformanceMetric {
550		PerformanceMetric {
551			metric_type,
552
553			value,
554
555			timestamp:SystemTime::now()
556				.duration_since(SystemTime::UNIX_EPOCH)
557				.unwrap_or_default()
558				.as_millis() as u64,
559
560			channel,
561
562			tags,
563		}
564	}
565
566	pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
567		TraceLog {
568			timestamp:SystemTime::now()
569				.duration_since(SystemTime::UNIX_EPOCH)
570				.unwrap_or_default()
571				.as_millis() as u64,
572
573			message,
574
575			level,
576
577			fields,
578		}
579	}
580
581	pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
582		let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
583
584		let error_score = 100.0 * (1.0 - error_rate / 100.0);
585
586		let throughput_score = throughput / 1000.0;
587
588		(time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
589			.max(0.0)
590			.min(100.0)
591	}
592
593	pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
594		match metric_type {
595			MetricType::MessageProcessingTime => format!("{:.2}ms", value),
596
597			MetricType::ConnectionLatency => format!("{:.2}ms", value),
598
599			MetricType::MemoryUsage => format!("{:.2}MB", value),
600
601			MetricType::CpuUsage => format!("{:.2}%", value),
602
603			MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
604
605			MetricType::ErrorRate => format!("{:.2}%", value),
606
607			MetricType::QueueSize => format!("{:.0}", value),
608		}
609	}
610}
611
612impl Clone for Struct {
613	fn clone(&self) -> Self {
614		Self {
615			config:self.config.clone(),
616
617			metrics:self.metrics.clone(),
618
619			traces:self.traces.clone(),
620
621			alerts:self.alerts.clone(),
622
623			statistics:self.statistics.clone(),
624
625			is_running:Arc::new(AsyncMutex::new(false)),
626		}
627	}
628}