Skip to main content

Mountain/IPC/Enhanced/MessageCompressor/
Compressor.rs

1
2//! `Compressor::Struct` - message batching + compression
3//! engine. Buffers messages until size or time triggers a
4//! flush, then emits a `CompressedBatch::Struct` using the
5//! configured algorithm. Struct + 14-method impl + utility
6//! functions stay in one file - tightly coupled cluster.
7
8use std::{
9	collections::VecDeque,
10	io::{Read, Write},
11};
12
13use bincode::serde::{decode_from_slice, encode_to_vec};
14use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
15use flate2::{
16	Compression,
17	write::{GzEncoder, ZlibEncoder},
18};
19use tokio::time::Instant;
20
21use crate::IPC::Enhanced::MessageCompressor::{
22	BatchConfig::Struct as BatchConfig,
23	BatchStats::Struct as BatchStats,
24	CompressedBatch::Struct as CompressedBatch,
25	CompressionAlgorithm::Enum as CompressionAlgorithm,
26	CompressionInfo::Struct as CompressionInfo,
27	CompressionLevel::Enum as CompressionLevel,
28};
29
30pub struct Struct {
31	pub(super) Config:BatchConfig,
32
33	pub(super) CurrentBatch:VecDeque<Vec<u8>>,
34
35	pub(super) BatchStartTime:Option<Instant>,
36
37	pub(super) BatchSizeBytes:usize,
38}
39
40impl Struct {
41	pub fn new(config:BatchConfig) -> Self {
42		Self {
43			Config:config,
44
45			CurrentBatch:VecDeque::new(),
46
47			BatchStartTime:None,
48
49			BatchSizeBytes:0,
50		}
51	}
52
53	pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
54		let MessageSize = MessageData.len();
55
56		let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
57
58		if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
59			return false;
60		}
61
62		self.CurrentBatch.push_back(MessageData.to_vec());
63
64		self.BatchSizeBytes += MessageSize;
65
66		if self.BatchStartTime.is_none() {
67			self.BatchStartTime = Some(Instant::now());
68		}
69
70		true
71	}
72
73	pub fn should_flush(&self) -> bool {
74		if self.CurrentBatch.is_empty() {
75			return false;
76		}
77
78		if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
79			return true;
80		}
81
82		if let Some(start_time) = self.BatchStartTime {
83			let elapsed = start_time.elapsed();
84
85			if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
86				return true;
87			}
88		}
89
90		false
91	}
92
93	pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
94		if self.CurrentBatch.is_empty() {
95			return Err("No messages in batch to flush".to_string());
96		}
97
98		let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
99
100		let total_size = self.BatchSizeBytes;
101
102		self.BatchStartTime = None;
103
104		self.BatchSizeBytes = 0;
105
106		let config = bincode::config::standard();
107
108		let serialized_batch =
109			encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
110
111		let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
112			self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
113		} else {
114			Ok((None, CompressionInfo::none()))
115		}?;
116
117		Ok(CompressedBatch {
118			messages_count:BatchMessages.len(),
119			original_size:total_size,
120			compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size),
121			compressed_data:CompressedData,
122			compression_info,
123			timestamp:std::time::SystemTime::now()
124				.duration_since(std::time::UNIX_EPOCH)
125				.unwrap_or_default()
126				.as_millis() as u64,
127		})
128	}
129
130	fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
131		match self.Config.Algorithm {
132			CompressionAlgorithm::Brotli => self.compress_brotli(data),
133
134			CompressionAlgorithm::Gzip => self.compress_gzip(data),
135
136			CompressionAlgorithm::Zlib => self.compress_zlib(data),
137		}
138	}
139
140	fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
141		let mut params = BrotliEncoderParams::default();
142
143		params.quality = self.Config.CompressionLevel as i32;
144
145		let mut compressed = Vec::new();
146
147		{
148			let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), &params);
149
150			std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
151
152			writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
153		}
154
155		let ratio = data.len() as f64 / compressed.len() as f64;
156
157		Ok((
158			compressed,
159			CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
160		))
161	}
162
163	fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
164		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
165
166		encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
167
168		let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
169
170		let ratio = data.len() as f64 / compressed.len() as f64;
171
172		Ok((
173			compressed,
174			CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
175		))
176	}
177
178	fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
179		let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
180
181		encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
182
183		let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
184
185		let ratio = data.len() as f64 / compressed.len() as f64;
186
187		Ok((
188			compressed,
189			CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
190		))
191	}
192
193	pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
194		let data = if let Some(ref compressed_data) = batch.compressed_data {
195			self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
196		} else {
197			encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
198		};
199
200		let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
201			.map_err(|e| format!("Failed to deserialize batch: {}", e))?;
202
203		Ok(decoded)
204	}
205
206	fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
207		match algorithm {
208			"brotli" => self.decompress_brotli(data),
209
210			"gzip" => self.decompress_gzip(data),
211
212			"zlib" => self.decompress_zlib(data),
213
214			_ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
215		}
216	}
217
218	fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
219		let mut decompressed = Vec::new();
220
221		let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
222
223		std::io::Read::read_to_end(&mut reader, &mut decompressed)
224			.map_err(|e| format!("Brotli decompression failed: {}", e))?;
225
226		Ok(decompressed)
227	}
228
229	fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
230		use flate2::read::GzDecoder;
231
232		let mut decoder = GzDecoder::new(data);
233
234		let mut decompressed = Vec::new();
235
236		decoder
237			.read_to_end(&mut decompressed)
238			.map_err(|e| format!("Gzip decompression failed: {}", e))?;
239
240		Ok(decompressed)
241	}
242
243	fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
244		use flate2::read::ZlibDecoder;
245
246		let mut decoder = ZlibDecoder::new(data);
247
248		let mut decompressed = Vec::new();
249
250		decoder
251			.read_to_end(&mut decompressed)
252			.map_err(|e| format!("Zlib decompression failed: {}", e))?;
253
254		Ok(decompressed)
255	}
256
257	pub fn get_batch_stats(&self) -> BatchStats {
258		BatchStats {
259			messages_count:self.CurrentBatch.len(),
260
261			total_size_bytes:self.BatchSizeBytes,
262
263			batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
264		}
265	}
266
267	pub fn clear_batch(&mut self) {
268		self.CurrentBatch.clear();
269
270		self.BatchStartTime = None;
271
272		self.BatchSizeBytes = 0;
273	}
274
275	pub fn compress_single_message(
276		message_data:&[u8],
277
278		algorithm:CompressionAlgorithm,
279
280		level:CompressionLevel,
281	) -> Result<(Vec<u8>, CompressionInfo), String> {
282		let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
283
284		let compressor = Self::new(config);
285
286		compressor.compress_data(message_data)
287	}
288
289	pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
290		if compressed_size == 0 {
291			return 0.0;
292		}
293
294		original_size as f64 / compressed_size as f64
295	}
296
297	pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
298		(original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
299	}
300}