1use std::{
8 collections::VecDeque,
9 io::{Read, Write},
10 time::Duration,
11};
12
13use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
14use flate2::{
15 Compression,
16 write::{GzEncoder, ZlibEncoder},
17};
18use log::{debug, error, info, trace, warn};
19use serde::{Deserialize, Serialize};
20use tokio::time::Instant;
21use bincode::serde::{decode_from_slice, encode_to_vec};
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
25pub enum CompressionLevel {
26 Fast = 1,
27 Balanced = 6,
28 High = 11,
29}
30
31#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
33pub enum CompressionAlgorithm {
34 Brotli,
35 Gzip,
36 Zlib,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct BatchConfig {
42 pub MaxBatchSize:usize,
43 pub MaxBatchDelayMs:u64,
44 pub CompressionThresholdBytes:usize,
45 pub CompressionLevel:CompressionLevel,
46 pub Algorithm:CompressionAlgorithm,
47}
48
49impl Default for BatchConfig {
50 fn default() -> Self {
51 Self {
52 MaxBatchSize:100,
53 MaxBatchDelayMs:100,
54 CompressionThresholdBytes:1024,
56 CompressionLevel:CompressionLevel::Balanced,
57 Algorithm:CompressionAlgorithm::Brotli,
58 }
59 }
60}
61
62pub struct MessageCompressor {
64 Config:BatchConfig,
65 CurrentBatch:VecDeque<Vec<u8>>,
66 BatchStartTime:Option<Instant>,
67 BatchSizeBytes:usize,
68}
69
70impl MessageCompressor {
71 pub fn new(config:BatchConfig) -> Self {
73 Self {
74 Config:config,
75 CurrentBatch:VecDeque::new(),
76 BatchStartTime:None,
77 BatchSizeBytes:0,
78 }
79 }
80
81 pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
83 let MessageSize = MessageData.len();
84 let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
85
86 if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
88 return false;
91 }
92
93 self.CurrentBatch.push_back(MessageData.to_vec());
95 self.BatchSizeBytes += MessageSize;
96
97 if self.BatchStartTime.is_none() {
99 self.BatchStartTime = Some(Instant::now());
100 }
101
102 true
103 }
104
105 pub fn should_flush(&self) -> bool {
107 if self.CurrentBatch.is_empty() {
108 return false;
109 }
110
111 if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
113 return true;
114 }
115
116 if let Some(start_time) = self.BatchStartTime {
118 let elapsed = start_time.elapsed();
119 if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
120 return true;
121 }
122 }
123
124 false
125 }
126
127 pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
129 if self.CurrentBatch.is_empty() {
130 return Err("No messages in batch to flush".to_string());
131 }
132
133 let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
134 let total_size = self.BatchSizeBytes;
135
136 self.BatchStartTime = None;
138 self.BatchSizeBytes = 0;
139
140 let config = bincode::config::standard();
142 let serialized_batch =
143 encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
144
145 let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
147 self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
148 } else {
149 Ok((None, CompressionInfo::none()))
150 }?;
151
152 Ok(CompressedBatch {
153 messages_count:BatchMessages.len(),
154 original_size:total_size,
155 compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size) as usize,
156 compressed_data:CompressedData,
157 compression_info,
158 timestamp:std::time::SystemTime::now()
159 .duration_since(std::time::UNIX_EPOCH)
160 .unwrap_or_default()
161 .as_millis() as u64,
162 })
163 }
164
165 fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
167 match self.Config.Algorithm {
168 CompressionAlgorithm::Brotli => self.compress_brotli(data),
169 CompressionAlgorithm::Gzip => self.compress_gzip(data),
170 CompressionAlgorithm::Zlib => self.compress_zlib(data),
171 }
172 }
173
174 fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
176 let mut params = BrotliEncoderParams::default();
177 params.quality = self.Config.CompressionLevel as i32;
178
179 let mut compressed = Vec::new();
180 {
181 let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), ¶ms);
182 std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
183 writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
184 } let ratio = data.len() as f64 / compressed.len() as f64;
187
188 Ok((
189 compressed,
190 CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
191 ))
192 }
193
194 fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
196 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
197 encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
198
199 let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
200
201 let ratio = data.len() as f64 / compressed.len() as f64;
202
203 Ok((
204 compressed,
205 CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
206 ))
207 }
208
209 fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
211 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
212 encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
213
214 let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
215
216 let ratio = data.len() as f64 / compressed.len() as f64;
217
218 Ok((
219 compressed,
220 CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
221 ))
222 }
223
224 pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
226 let data = if let Some(ref compressed_data) = batch.compressed_data {
227 self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
228 } else {
229 encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
230 };
231
232 let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
233 .map_err(|e| format!("Failed to deserialize batch: {}", e))?;
234 Ok(decoded)
235 }
236
237 fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
239 match algorithm {
240 "brotli" => self.decompress_brotli(data),
241 "gzip" => self.decompress_gzip(data),
242 "zlib" => self.decompress_zlib(data),
243 _ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
244 }
245 }
246
247 fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
249 let mut decompressed = Vec::new();
250 let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
251
252 std::io::Read::read_to_end(&mut reader, &mut decompressed)
253 .map_err(|e| format!("Brotli decompression failed: {}", e))?;
254
255 Ok(decompressed)
256 }
257
258 fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
260 use flate2::read::GzDecoder;
261
262 let mut decoder = GzDecoder::new(data);
263 let mut decompressed = Vec::new();
264 decoder
265 .read_to_end(&mut decompressed)
266 .map_err(|e| format!("Gzip decompression failed: {}", e))?;
267
268 Ok(decompressed)
269 }
270
271 fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
273 use flate2::read::ZlibDecoder;
274
275 let mut decoder = ZlibDecoder::new(data);
276 let mut decompressed = Vec::new();
277 decoder
278 .read_to_end(&mut decompressed)
279 .map_err(|e| format!("Zlib decompression failed: {}", e))?;
280
281 Ok(decompressed)
282 }
283
284 pub fn get_batch_stats(&self) -> BatchStats {
286 BatchStats {
287 messages_count:self.CurrentBatch.len(),
288 total_size_bytes:self.BatchSizeBytes,
289 batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
290 }
291 }
292
293 pub fn clear_batch(&mut self) {
295 self.CurrentBatch.clear();
296 self.BatchStartTime = None;
297 self.BatchSizeBytes = 0;
298 }
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct CompressedBatch {
304 pub messages_count:usize,
305 pub original_size:usize,
306 pub compressed_size:usize,
307 pub compressed_data:Option<Vec<u8>>,
308 pub compression_info:CompressionInfo,
309 pub timestamp:u64,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct CompressionInfo {
315 pub algorithm:String,
316 pub level:u32,
317 pub ratio:f64,
318}
319
320impl CompressionInfo {
321 fn none() -> Self { Self { algorithm:"none".to_string(), level:0, ratio:1.0 } }
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct BatchStats {
327 pub messages_count:usize,
328 pub total_size_bytes:usize,
329 pub batch_age_ms:u64,
330}
331
332impl MessageCompressor {
334 pub fn compress_single_message(
336 message_data:&[u8],
337 algorithm:CompressionAlgorithm,
338 level:CompressionLevel,
339 ) -> Result<(Vec<u8>, CompressionInfo), String> {
340 let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
341
342 let compressor = MessageCompressor::new(config);
343 compressor.compress_data(message_data)
344 }
345
346 pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
348 if compressed_size == 0 {
349 return 0.0;
350 }
351 original_size as f64 / compressed_size as f64
352 }
353
354 pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
356 (original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_message_compression() {
366 let test_data = b"This is a test message for compression evaluation".to_vec();
367
368 let (compressed, info) = MessageCompressor::compress_single_message(
370 &test_data,
371 CompressionAlgorithm::Brotli,
372 CompressionLevel::Balanced,
373 )
374 .unwrap();
375
376 assert!(compressed.len() < test_data.len());
377 assert!(info.ratio > 1.0);
378 assert_eq!(info.algorithm, "brotli");
379
380 let (compressed_gzip, info_gzip) = MessageCompressor::compress_single_message(
382 &test_data,
383 CompressionAlgorithm::Gzip,
384 CompressionLevel::Balanced,
385 )
386 .unwrap();
387
388 assert!(compressed_gzip.len() < test_data.len());
389 assert!(info_gzip.ratio > 1.0);
390 assert_eq!(info_gzip.algorithm, "gzip");
391 }
392
393 #[test]
394 fn test_batch_compression() {
395 let config = BatchConfig::default();
396 let mut compressor = MessageCompressor::new(config);
397
398 let messages:Vec<Vec<u8>> = (0..5).map(|i| format!("Message {}", i).into_bytes()).collect();
399
400 for message in &messages {
401 compressor.add_message(message);
402 }
403
404 assert!(compressor.should_flush());
405
406 let batch = compressor.flush_batch().unwrap();
407 assert_eq!(batch.messages_count, 5);
408 assert!(batch.compressed_size <= batch.original_size);
409 }
410
411 #[test]
412 fn test_compression_ratio_calculation() {
413 let ratio = MessageCompressor::calculate_compression_ratio(1000, 500);
414 assert_eq!(ratio, 2.0);
415
416 let savings = MessageCompressor::estimate_savings(1000, 2.0);
417 assert_eq!(savings, 500);
418 }
419}