Mountain/IPC/Enhanced/MessageCompressor/
Compressor.rs1
2use std::{
9 collections::VecDeque,
10 io::{Read, Write},
11};
12
13use bincode::serde::{decode_from_slice, encode_to_vec};
14use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
15use flate2::{
16 Compression,
17 write::{GzEncoder, ZlibEncoder},
18};
19use tokio::time::Instant;
20
21use crate::IPC::Enhanced::MessageCompressor::{
22 BatchConfig::Struct as BatchConfig,
23 BatchStats::Struct as BatchStats,
24 CompressedBatch::Struct as CompressedBatch,
25 CompressionAlgorithm::Enum as CompressionAlgorithm,
26 CompressionInfo::Struct as CompressionInfo,
27 CompressionLevel::Enum as CompressionLevel,
28};
29
30pub struct Struct {
31 pub(super) Config:BatchConfig,
32
33 pub(super) CurrentBatch:VecDeque<Vec<u8>>,
34
35 pub(super) BatchStartTime:Option<Instant>,
36
37 pub(super) BatchSizeBytes:usize,
38}
39
40impl Struct {
41 pub fn new(config:BatchConfig) -> Self {
42 Self {
43 Config:config,
44
45 CurrentBatch:VecDeque::new(),
46
47 BatchStartTime:None,
48
49 BatchSizeBytes:0,
50 }
51 }
52
53 pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
54 let MessageSize = MessageData.len();
55
56 let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
57
58 if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
59 return false;
60 }
61
62 self.CurrentBatch.push_back(MessageData.to_vec());
63
64 self.BatchSizeBytes += MessageSize;
65
66 if self.BatchStartTime.is_none() {
67 self.BatchStartTime = Some(Instant::now());
68 }
69
70 true
71 }
72
73 pub fn should_flush(&self) -> bool {
74 if self.CurrentBatch.is_empty() {
75 return false;
76 }
77
78 if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
79 return true;
80 }
81
82 if let Some(start_time) = self.BatchStartTime {
83 let elapsed = start_time.elapsed();
84
85 if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
86 return true;
87 }
88 }
89
90 false
91 }
92
93 pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
94 if self.CurrentBatch.is_empty() {
95 return Err("No messages in batch to flush".to_string());
96 }
97
98 let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
99
100 let total_size = self.BatchSizeBytes;
101
102 self.BatchStartTime = None;
103
104 self.BatchSizeBytes = 0;
105
106 let config = bincode::config::standard();
107
108 let serialized_batch =
109 encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
110
111 let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
112 self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
113 } else {
114 Ok((None, CompressionInfo::none()))
115 }?;
116
117 Ok(CompressedBatch {
118 messages_count:BatchMessages.len(),
119 original_size:total_size,
120 compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size),
121 compressed_data:CompressedData,
122 compression_info,
123 timestamp:std::time::SystemTime::now()
124 .duration_since(std::time::UNIX_EPOCH)
125 .unwrap_or_default()
126 .as_millis() as u64,
127 })
128 }
129
130 fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
131 match self.Config.Algorithm {
132 CompressionAlgorithm::Brotli => self.compress_brotli(data),
133
134 CompressionAlgorithm::Gzip => self.compress_gzip(data),
135
136 CompressionAlgorithm::Zlib => self.compress_zlib(data),
137 }
138 }
139
140 fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
141 let mut params = BrotliEncoderParams::default();
142
143 params.quality = self.Config.CompressionLevel as i32;
144
145 let mut compressed = Vec::new();
146
147 {
148 let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), ¶ms);
149
150 std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
151
152 writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
153 }
154
155 let ratio = data.len() as f64 / compressed.len() as f64;
156
157 Ok((
158 compressed,
159 CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
160 ))
161 }
162
163 fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
164 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
165
166 encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
167
168 let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
169
170 let ratio = data.len() as f64 / compressed.len() as f64;
171
172 Ok((
173 compressed,
174 CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
175 ))
176 }
177
178 fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
179 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
180
181 encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
182
183 let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
184
185 let ratio = data.len() as f64 / compressed.len() as f64;
186
187 Ok((
188 compressed,
189 CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
190 ))
191 }
192
193 pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
194 let data = if let Some(ref compressed_data) = batch.compressed_data {
195 self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
196 } else {
197 encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
198 };
199
200 let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
201 .map_err(|e| format!("Failed to deserialize batch: {}", e))?;
202
203 Ok(decoded)
204 }
205
206 fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
207 match algorithm {
208 "brotli" => self.decompress_brotli(data),
209
210 "gzip" => self.decompress_gzip(data),
211
212 "zlib" => self.decompress_zlib(data),
213
214 _ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
215 }
216 }
217
218 fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
219 let mut decompressed = Vec::new();
220
221 let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
222
223 std::io::Read::read_to_end(&mut reader, &mut decompressed)
224 .map_err(|e| format!("Brotli decompression failed: {}", e))?;
225
226 Ok(decompressed)
227 }
228
229 fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
230 use flate2::read::GzDecoder;
231
232 let mut decoder = GzDecoder::new(data);
233
234 let mut decompressed = Vec::new();
235
236 decoder
237 .read_to_end(&mut decompressed)
238 .map_err(|e| format!("Gzip decompression failed: {}", e))?;
239
240 Ok(decompressed)
241 }
242
243 fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
244 use flate2::read::ZlibDecoder;
245
246 let mut decoder = ZlibDecoder::new(data);
247
248 let mut decompressed = Vec::new();
249
250 decoder
251 .read_to_end(&mut decompressed)
252 .map_err(|e| format!("Zlib decompression failed: {}", e))?;
253
254 Ok(decompressed)
255 }
256
257 pub fn get_batch_stats(&self) -> BatchStats {
258 BatchStats {
259 messages_count:self.CurrentBatch.len(),
260
261 total_size_bytes:self.BatchSizeBytes,
262
263 batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
264 }
265 }
266
267 pub fn clear_batch(&mut self) {
268 self.CurrentBatch.clear();
269
270 self.BatchStartTime = None;
271
272 self.BatchSizeBytes = 0;
273 }
274
275 pub fn compress_single_message(
276 message_data:&[u8],
277
278 algorithm:CompressionAlgorithm,
279
280 level:CompressionLevel,
281 ) -> Result<(Vec<u8>, CompressionInfo), String> {
282 let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
283
284 let compressor = Self::new(config);
285
286 compressor.compress_data(message_data)
287 }
288
289 pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
290 if compressed_size == 0 {
291 return 0.0;
292 }
293
294 original_size as f64 / compressed_size as f64
295 }
296
297 pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
298 (original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
299 }
300}