Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs1
2use std::{
9 collections::{HashMap, VecDeque},
10 sync::Arc,
11 time::{Duration, SystemTime},
12};
13
14use tokio::{
15 sync::{Mutex as AsyncMutex, RwLock},
16 time::interval,
17};
18
19use crate::{
20 IPC::Enhanced::PerformanceDashboard::{
21 AlertSeverity::Enum as AlertSeverity,
22 DashboardConfig::Struct as DashboardConfig,
23 DashboardStatistics::Struct as DashboardStatistics,
24 LogLevel::Enum as LogLevel,
25 MetricType::Enum as MetricType,
26 PerformanceAlert::Struct as PerformanceAlert,
27 PerformanceMetric::Struct as PerformanceMetric,
28 TraceLog::Struct as TraceLog,
29 TraceSpan::Struct as TraceSpan,
30 },
31 dev_log,
32};
33
34pub struct Struct {
35 pub(super) config:DashboardConfig,
36
37 pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
38
39 pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
40
41 pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
42
43 pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
44
45 pub(super) is_running:Arc<AsyncMutex<bool>>,
46}
47
48impl Struct {
49 pub fn new(config:DashboardConfig) -> Self {
50 let config_clone = config.clone();
51
52 let dashboard = Self {
53 config,
54
55 metrics:Arc::new(RwLock::new(VecDeque::new())),
56
57 traces:Arc::new(RwLock::new(HashMap::new())),
58
59 alerts:Arc::new(RwLock::new(VecDeque::new())),
60
61 statistics:Arc::new(RwLock::new(DashboardStatistics {
62 total_metrics_collected:0,
63 total_traces_collected:0,
64 total_alerts_triggered:0,
65 average_processing_time_ms:0.0,
66 peak_processing_time_ms:0,
67 error_rate_percentage:0.0,
68 throughput_messages_per_second:0.0,
69 memory_usage_mb:0.0,
70 last_update:SystemTime::now()
71 .duration_since(SystemTime::UNIX_EPOCH)
72 .unwrap_or_default()
73 .as_secs(),
74 })),
75
76 is_running:Arc::new(AsyncMutex::new(false)),
77 };
78
79 dev_log!(
80 "ipc",
81 "[PerformanceDashboard] Created dashboard with {}ms update interval",
82 config_clone.update_interval_ms
83 );
84
85 dashboard
86 }
87
88 pub async fn start(&self) -> Result<(), String> {
89 {
90 let mut running = self.is_running.lock().await;
91
92 if *running {
93 return Ok(());
94 }
95
96 *running = true;
97 }
98
99 self.start_metrics_collection().await;
100
101 self.start_alert_monitoring().await;
102
103 self.start_data_cleanup().await;
104
105 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
106
107 Ok(())
108 }
109
110 pub async fn stop(&self) -> Result<(), String> {
111 {
112 let mut running = self.is_running.lock().await;
113
114 if !*running {
115 return Ok(());
116 }
117
118 *running = false;
119 }
120
121 {
122 let mut metrics = self.metrics.write().await;
123
124 metrics.clear();
125 }
126
127 {
128 let mut traces = self.traces.write().await;
129
130 traces.clear();
131 }
132
133 {
134 let mut alerts = self.alerts.write().await;
135
136 alerts.clear();
137 }
138
139 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
140
141 Ok(())
142 }
143
144 pub async fn record_metric(&self, metric:PerformanceMetric) {
145 let mut metrics = self.metrics.write().await;
146
147 metrics.push_back(metric.clone());
148
149 drop(metrics);
150
151 self.update_statistics().await;
152
153 self.check_alerts(&metric).await;
154
155 dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
156 }
157
158 pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
159 let trace_id = Self::generate_trace_id();
160
161 let span_id = Self::generate_span_id();
162
163 let span = TraceSpan {
164 trace_id:trace_id.clone(),
165
166 span_id:span_id.clone(),
167
168 parent_span_id:None,
169
170 operation_name,
171
172 start_time:SystemTime::now()
173 .duration_since(SystemTime::UNIX_EPOCH)
174 .unwrap_or_default()
175 .as_millis() as u64,
176
177 end_time:None,
178
179 duration_ms:None,
180
181 tags:HashMap::new(),
182
183 logs:Vec::new(),
184 };
185
186 {
187 let mut traces = self.traces.write().await;
188
189 traces.insert(span_id.clone(), span.clone());
190 }
191
192 {
193 let mut stats = self.statistics.write().await;
194
195 stats.total_traces_collected += 1;
196 }
197
198 span
199 }
200
201 pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
202 let mut traces = self.traces.write().await;
203
204 if let Some(span) = traces.get_mut(span_id) {
205 let end_time = SystemTime::now()
206 .duration_since(SystemTime::UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_millis() as u64;
209
210 span.end_time = Some(end_time);
211
212 span.duration_ms = Some(end_time.saturating_sub(span.start_time));
213
214 dev_log!(
215 "ipc",
216 "[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
217 span.operation_name,
218 span.duration_ms.unwrap_or(0)
219 );
220
221 Ok(())
222 } else {
223 Err(format!("Trace span not found: {}", span_id))
224 }
225 }
226
227 pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
228 let mut traces = self.traces.write().await;
229
230 if let Some(span) = traces.get_mut(span_id) {
231 span.logs.push(log);
232
233 Ok(())
234 } else {
235 Err(format!("Trace span not found: {}", span_id))
236 }
237 }
238
239 async fn start_metrics_collection(&self) {
240 let dashboard = Arc::new(self.clone());
241
242 tokio::spawn(async move {
243 let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
244
245 while *dashboard.is_running.lock().await {
246 interval.tick().await;
247 dashboard.collect_system_metrics().await;
248 dashboard.update_statistics().await;
249 }
250 });
251 }
252
253 async fn start_alert_monitoring(&self) {
254 let dashboard = Arc::new(self.clone());
255
256 tokio::spawn(async move {
257 let mut interval = interval(Duration::from_secs(10));
258
259 while *dashboard.is_running.lock().await {
260 interval.tick().await;
261 dashboard.check_performance_alerts().await;
262 }
263 });
264 }
265
266 async fn start_data_cleanup(&self) {
267 let dashboard = Arc::new(self.clone());
268
269 tokio::spawn(async move {
270 let mut interval = interval(Duration::from_secs(3600));
271
272 while *dashboard.is_running.lock().await {
273 interval.tick().await;
274 dashboard.cleanup_old_data().await;
275 }
276 });
277 }
278
279 async fn collect_system_metrics(&self) {
280 if let Ok(memory_usage) = Self::get_memory_usage() {
281 let metric = PerformanceMetric {
282 metric_type:MetricType::MemoryUsage,
283
284 value:memory_usage,
285
286 timestamp:SystemTime::now()
287 .duration_since(SystemTime::UNIX_EPOCH)
288 .unwrap_or_default()
289 .as_millis() as u64,
290
291 channel:None,
292
293 tags:HashMap::new(),
294 };
295
296 self.record_metric(metric).await;
297 }
298
299 if let Ok(cpu_usage) = Self::get_cpu_usage() {
300 let metric = PerformanceMetric {
301 metric_type:MetricType::CpuUsage,
302
303 value:cpu_usage,
304
305 timestamp:SystemTime::now()
306 .duration_since(SystemTime::UNIX_EPOCH)
307 .unwrap_or_default()
308 .as_millis() as u64,
309
310 channel:None,
311
312 tags:HashMap::new(),
313 };
314
315 self.record_metric(metric).await;
316 }
317 }
318
319 async fn update_statistics(&self) {
320 let metrics = self.metrics.read().await;
321
322 let mut stats = self.statistics.write().await;
323
324 let processing_metrics:Vec<&PerformanceMetric> = metrics
325 .iter()
326 .filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
327 .collect();
328
329 if !processing_metrics.is_empty() {
330 let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
331
332 stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
333
334 stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
335 }
336
337 let error_metrics:Vec<&PerformanceMetric> = metrics
338 .iter()
339 .filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
340 .collect();
341
342 if !error_metrics.is_empty() {
343 let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
344
345 stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
346 }
347
348 let throughput_metrics:Vec<&PerformanceMetric> = metrics
349 .iter()
350 .filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
351 .collect();
352
353 if !throughput_metrics.is_empty() {
354 let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
355
356 stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
357 }
358
359 let memory_metrics:Vec<&PerformanceMetric> = metrics
360 .iter()
361 .filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
362 .collect();
363
364 if !memory_metrics.is_empty() {
365 let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
366
367 stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
368 }
369
370 stats.last_update = SystemTime::now()
371 .duration_since(SystemTime::UNIX_EPOCH)
372 .unwrap_or_default()
373 .as_secs();
374 }
375
376 async fn check_alerts(&self, metric:&PerformanceMetric) {
377 let threshold = match metric.metric_type {
378 MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
379
380 MetricType::ErrorRate => 5.0,
381
382 MetricType::MemoryUsage => 1024.0,
383
384 MetricType::CpuUsage => 90.0,
385
386 _ => return,
387 };
388
389 if metric.value > threshold {
390 let severity = match metric.value / threshold {
391 ratio if ratio > 5.0 => AlertSeverity::Critical,
392
393 ratio if ratio > 3.0 => AlertSeverity::High,
394
395 ratio if ratio > 2.0 => AlertSeverity::Medium,
396
397 _ => AlertSeverity::Low,
398 };
399
400 let alert = PerformanceAlert {
401 alert_id:Self::generate_alert_id(),
402
403 metric_type:metric.metric_type.clone(),
404
405 threshold,
406
407 current_value:metric.value,
408
409 timestamp:metric.timestamp,
410
411 channel:metric.channel.clone(),
412
413 severity,
414
415 message:format!(
416 "{} exceeded threshold: {} > {}",
417 Self::metric_type_name(&metric.metric_type),
418 metric.value,
419 threshold
420 ),
421 };
422
423 {
424 let mut alerts = self.alerts.write().await;
425
426 alerts.push_back(alert.clone());
427 }
428
429 {
430 let mut stats = self.statistics.write().await;
431
432 stats.total_alerts_triggered += 1;
433 }
434
435 dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
436 }
437 }
438
439 async fn check_performance_alerts(&self) {
440 dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
441 }
442
443 async fn cleanup_old_data(&self) {
444 let retention_threshold = SystemTime::now()
445 .duration_since(SystemTime::UNIX_EPOCH)
446 .unwrap_or_default()
447 .as_secs()
448 - (self.config.metrics_retention_hours * 3600);
449
450 {
451 let mut metrics = self.metrics.write().await;
452
453 metrics.retain(|m| m.timestamp >= retention_threshold);
454 }
455
456 {
457 let mut traces = self.traces.write().await;
458
459 traces.retain(|_, span| span.start_time >= retention_threshold);
460
461 if traces.len() > self.config.max_traces_stored {
462 let excess = traces.len() - self.config.max_traces_stored;
463
464 let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
465
466 for key in keys_to_remove {
467 traces.remove(&key);
468 }
469 }
470 }
471
472 {
473 let mut alerts = self.alerts.write().await;
474
475 alerts.retain(|a| a.timestamp >= retention_threshold);
476 }
477
478 dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
479 }
480
481 fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
482
483 fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
484
485 fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
486
487 fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
488
489 fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
490
491 fn metric_type_name(metric_type:&MetricType) -> &'static str {
492 match metric_type {
493 MetricType::MessageProcessingTime => "Message Processing Time",
494
495 MetricType::ConnectionLatency => "Connection Latency",
496
497 MetricType::MemoryUsage => "Memory Usage",
498
499 MetricType::CpuUsage => "CPU Usage",
500
501 MetricType::NetworkThroughput => "Network Throughput",
502
503 MetricType::ErrorRate => "Error Rate",
504
505 MetricType::QueueSize => "Queue Size",
506 }
507 }
508
509 pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
510
511 pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
512 let metrics = self.metrics.read().await;
513
514 metrics.iter().rev().take(limit).cloned().collect()
515 }
516
517 pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
518 let alerts = self.alerts.read().await;
519
520 alerts.iter().rev().cloned().collect()
521 }
522
523 pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
524 let traces = self.traces.read().await;
525
526 traces.values().find(|span| span.trace_id == trace_id).cloned()
527 }
528
529 pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
530
531 pub fn high_frequency_dashboard() -> Self {
532 Self::new(DashboardConfig {
533 update_interval_ms:1000,
534 metrics_retention_hours:1,
535 alert_threshold_ms:500,
536 trace_sampling_rate:1.0,
537 max_traces_stored:5000,
538 })
539 }
540
541 pub fn create_metric(
542 metric_type:MetricType,
543
544 value:f64,
545
546 channel:Option<String>,
547
548 tags:HashMap<String, String>,
549 ) -> PerformanceMetric {
550 PerformanceMetric {
551 metric_type,
552
553 value,
554
555 timestamp:SystemTime::now()
556 .duration_since(SystemTime::UNIX_EPOCH)
557 .unwrap_or_default()
558 .as_millis() as u64,
559
560 channel,
561
562 tags,
563 }
564 }
565
566 pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
567 TraceLog {
568 timestamp:SystemTime::now()
569 .duration_since(SystemTime::UNIX_EPOCH)
570 .unwrap_or_default()
571 .as_millis() as u64,
572
573 message,
574
575 level,
576
577 fields,
578 }
579 }
580
581 pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
582 let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
583
584 let error_score = 100.0 * (1.0 - error_rate / 100.0);
585
586 let throughput_score = throughput / 1000.0;
587
588 (time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
589 .max(0.0)
590 .min(100.0)
591 }
592
593 pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
594 match metric_type {
595 MetricType::MessageProcessingTime => format!("{:.2}ms", value),
596
597 MetricType::ConnectionLatency => format!("{:.2}ms", value),
598
599 MetricType::MemoryUsage => format!("{:.2}MB", value),
600
601 MetricType::CpuUsage => format!("{:.2}%", value),
602
603 MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
604
605 MetricType::ErrorRate => format!("{:.2}%", value),
606
607 MetricType::QueueSize => format!("{:.0}", value),
608 }
609 }
610}
611
612impl Clone for Struct {
613 fn clone(&self) -> Self {
614 Self {
615 config:self.config.clone(),
616
617 metrics:self.metrics.clone(),
618
619 traces:self.traces.clone(),
620
621 alerts:self.alerts.clone(),
622
623 statistics:self.statistics.clone(),
624
625 is_running:Arc::new(AsyncMutex::new(false)),
626 }
627 }
628}