Skip to main content

Mountain/IPC/AdvancedFeatures/
Features.rs

1
2//! `AdvancedFeatures` aggregator - holds the runtime handle,
3//! cumulative `PerformanceStats::Struct`, the realtime
4//! collaboration-session map, and the
5//! `MessageCache::Struct`. Spawns three monitor tasks
6//! (`monitor_performance`, `cleanup_cache`,
7//! `monitor_collaboration_sessions`) on `start_monitoring`.
8//! The 12-method impl is kept in one file - tightly-coupled
9//! cluster.
10
11use std::{
12	collections::HashMap,
13	sync::{Arc, Mutex},
14	time::{Duration, SystemTime},
15};
16
17use tauri::Emitter;
18use tokio::time::interval;
19
20use crate::{
21	IPC::AdvancedFeatures::{
22		CachedMessage::Struct as CachedMessage,
23		CollaborationPermissions::Struct as CollaborationPermissions,
24		CollaborationSession::Struct as CollaborationSession,
25		MessageCache::Struct as MessageCache,
26		PerformanceStats::Struct as PerformanceStats,
27	},
28	RunTime::ApplicationRunTime::ApplicationRunTime,
29	dev_log,
30};
31
32#[derive(Clone)]
33pub struct Struct {
34	pub(super) runtime:Arc<ApplicationRunTime>,
35
36	pub(super) performance_stats:Arc<Mutex<PerformanceStats>>,
37
38	pub(super) collaboration_sessions:Arc<Mutex<HashMap<String, CollaborationSession>>>,
39
40	pub(super) message_cache:Arc<Mutex<MessageCache>>,
41}
42
43impl Struct {
44	pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
45		dev_log!("lifecycle", "Initializing advanced IPC features");
46
47		Self {
48			runtime,
49
50			performance_stats:Arc::new(Mutex::new(PerformanceStats {
51				total_messages_sent:0,
52				total_messages_received:0,
53				average_processing_time_ms:0.0,
54				peak_message_rate:0,
55				error_count:0,
56				last_update:SystemTime::now()
57					.duration_since(SystemTime::UNIX_EPOCH)
58					.unwrap_or_default()
59					.as_secs(),
60				connection_uptime:0,
61			})),
62
63			collaboration_sessions:Arc::new(Mutex::new(HashMap::new())),
64
65			message_cache:Arc::new(Mutex::new(MessageCache {
66				cached_messages:HashMap::new(),
67				cache_hits:0,
68				cache_misses:0,
69				cache_size:0,
70			})),
71		}
72	}
73
74	pub async fn start_monitoring(&self) -> Result<(), String> {
75		dev_log!("lifecycle", "Starting advanced monitoring");
76
77		let features1 = self.clone_features();
78
79		let features2 = self.clone_features();
80
81		let features3 = self.clone_features();
82
83		tokio::spawn(async move {
84			features1.monitor_performance().await;
85		});
86
87		tokio::spawn(async move {
88			features2.cleanup_cache().await;
89		});
90
91		tokio::spawn(async move {
92			features3.monitor_collaboration_sessions().await;
93		});
94
95		Ok(())
96	}
97
98	async fn monitor_performance(&self) {
99		let mut interval = interval(Duration::from_secs(10));
100
101		loop {
102			interval.tick().await;
103
104			let stats = self.calculate_performance_stats().await;
105
106			if let Err(e) = self.runtime.Environment.ApplicationHandle.emit("ipc-performance-stats", &stats) {
107				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit performance stats: {}", e);
108			}
109
110			dev_log!("lifecycle", "Performance stats updated");
111		}
112	}
113
114	async fn calculate_performance_stats(&self) -> PerformanceStats {
115		let mut stats = self.performance_stats.lock().unwrap();
116
117		stats.connection_uptime = SystemTime::now()
118			.duration_since(SystemTime::UNIX_EPOCH)
119			.unwrap_or_default()
120			.as_secs()
121			- stats.last_update;
122
123		stats.last_update = SystemTime::now()
124			.duration_since(SystemTime::UNIX_EPOCH)
125			.unwrap_or_default()
126			.as_secs();
127
128		stats.clone()
129	}
130
131	async fn cleanup_cache(&self) {
132		let mut interval = interval(Duration::from_secs(60));
133
134		loop {
135			interval.tick().await;
136
137			let current_time = SystemTime::now()
138				.duration_since(SystemTime::UNIX_EPOCH)
139				.unwrap_or_default()
140				.as_secs();
141
142			let mut cache = self.message_cache.lock().unwrap();
143
144			cache
145				.cached_messages
146				.retain(|_, cached_message| current_time < cached_message.timestamp + cached_message.ttl);
147
148			cache.cache_size = cache.cached_messages.len();
149
150			dev_log!("lifecycle", "Cache cleaned, {} entries remaining", cache.cache_size);
151		}
152	}
153
154	async fn monitor_collaboration_sessions(&self) {
155		let mut interval = interval(Duration::from_secs(30));
156
157		loop {
158			interval.tick().await;
159
160			let current_time = SystemTime::now()
161				.duration_since(SystemTime::UNIX_EPOCH)
162				.unwrap_or_default()
163				.as_secs();
164
165			let mut sessions = self.collaboration_sessions.lock().unwrap();
166
167			sessions.retain(|_, session| current_time - session.last_activity < 300);
168
169			let active_sessions:Vec<CollaborationSession> = sessions.values().cloned().collect();
170
171			if let Err(e) = self
172				.runtime
173				.Environment
174				.ApplicationHandle
175				.emit("collaboration-sessions-update", &active_sessions)
176			{
177				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit collaboration sessions: {}", e);
178			}
179
180			dev_log!("lifecycle", "Collaboration sessions monitored, {} active", sessions.len());
181		}
182	}
183
184	pub async fn cache_message(&self, message_id:String, data:serde_json::Value, ttl:u64) -> Result<(), String> {
185		let mut cache = self
186			.message_cache
187			.lock()
188			.map_err(|e| format!("Failed to access message cache: {}", e))?;
189
190		let cached_message = CachedMessage {
191			data,
192
193			timestamp:SystemTime::now()
194				.duration_since(SystemTime::UNIX_EPOCH)
195				.unwrap_or_default()
196				.as_secs(),
197
198			ttl,
199		};
200
201		cache.cached_messages.insert(message_id.clone(), cached_message);
202
203		cache.cache_size = cache.cached_messages.len();
204
205		dev_log!("lifecycle", "Message cached: {}, TTL: {}s", message_id, ttl);
206
207		Ok(())
208	}
209
210	pub async fn get_cached_message(&self, message_id:&str) -> Option<serde_json::Value> {
211		let mut cache = self.message_cache.lock().unwrap();
212
213		let result = cache
214			.cached_messages
215			.get(message_id)
216			.map(|cached_message| cached_message.data.clone());
217
218		if result.is_some() {
219			cache.cache_hits += 1;
220		} else {
221			cache.cache_misses += 1;
222		}
223
224		result
225	}
226
227	pub async fn create_collaboration_session(
228		&self,
229
230		session_id:String,
231
232		permissions:CollaborationPermissions,
233	) -> Result<(), String> {
234		let mut sessions = self
235			.collaboration_sessions
236			.lock()
237			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
238
239		let session = CollaborationSession {
240			session_id:session_id.clone(),
241
242			participants:Vec::new(),
243
244			active_documents:Vec::new(),
245
246			last_activity:SystemTime::now()
247				.duration_since(SystemTime::UNIX_EPOCH)
248				.unwrap_or_default()
249				.as_secs(),
250
251			permissions,
252		};
253
254		sessions.insert(session_id, session);
255
256		dev_log!("lifecycle", "Collaboration session created");
257
258		Ok(())
259	}
260
261	pub async fn add_participant(&self, session_id:&str, participant:String) -> Result<(), String> {
262		let mut sessions = self
263			.collaboration_sessions
264			.lock()
265			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
266
267		if let Some(session) = sessions.get_mut(session_id) {
268			if !session.participants.contains(&participant) {
269				session.participants.push(participant);
270
271				session.last_activity = SystemTime::now()
272					.duration_since(SystemTime::UNIX_EPOCH)
273					.unwrap_or_default()
274					.as_secs();
275
276				dev_log!("lifecycle", "Participant added to session: {}", session_id);
277			}
278		} else {
279			return Err(format!("Session not found: {}", session_id));
280		}
281
282		Ok(())
283	}
284
285	pub async fn record_message_statistics(&self, sent:bool, processing_time_ms:u64) {
286		let mut stats = self.performance_stats.lock().unwrap();
287
288		if sent {
289			stats.total_messages_sent += 1;
290		} else {
291			stats.total_messages_received += 1;
292		}
293
294		let total_messages = stats.total_messages_sent + stats.total_messages_received;
295
296		stats.average_processing_time_ms = (stats.average_processing_time_ms * (total_messages - 1) as f64
297			+ processing_time_ms as f64)
298			/ total_messages as f64;
299	}
300
301	pub async fn record_error(&self) {
302		let mut stats = self.performance_stats.lock().unwrap();
303
304		stats.error_count += 1;
305	}
306
307	pub async fn get_performance_stats(&self) -> Result<PerformanceStats, String> {
308		Ok(self.calculate_performance_stats().await)
309	}
310
311	pub async fn get_cache_stats(&self) -> Result<MessageCache, String> {
312		let cache = self.message_cache.lock().unwrap();
313
314		Ok(cache.clone())
315	}
316
317	pub async fn get_collaboration_sessions(&self) -> Vec<CollaborationSession> {
318		let sessions = self.collaboration_sessions.lock().unwrap();
319
320		sessions.values().cloned().collect()
321	}
322
323	pub(super) fn clone_features(&self) -> Self {
324		Self {
325			runtime:self.runtime.clone(),
326
327			performance_stats:self.performance_stats.clone(),
328
329			collaboration_sessions:self.collaboration_sessions.clone(),
330
331			message_cache:self.message_cache.clone(),
332		}
333	}
334}