Mountain/IPC/Enhanced/
MessageCompressor.rs

1//! # Message Compressor and Batching
2//!
3//! Advanced message compression and batching for IPC performance optimization.
4//! Supports Brotli compression for large payloads and intelligent message
5//! batching.
6
7use std::{
8	collections::VecDeque,
9	io::{Read, Write},
10	time::Duration,
11};
12
13use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
14use flate2::{
15	Compression,
16	write::{GzEncoder, ZlibEncoder},
17};
18use log::{debug, error, info, trace, warn};
19use serde::{Deserialize, Serialize};
20use tokio::time::Instant;
21use bincode::serde::{decode_from_slice, encode_to_vec};
22
23/// Message compression levels
24#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
25pub enum CompressionLevel {
26	Fast = 1,
27	Balanced = 6,
28	High = 11,
29}
30
31/// Compression algorithm selection
32#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
33pub enum CompressionAlgorithm {
34	Brotli,
35	Gzip,
36	Zlib,
37}
38
39/// Message batch configuration
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct BatchConfig {
42	pub MaxBatchSize:usize,
43	pub MaxBatchDelayMs:u64,
44	pub CompressionThresholdBytes:usize,
45	pub CompressionLevel:CompressionLevel,
46	pub Algorithm:CompressionAlgorithm,
47}
48
49impl Default for BatchConfig {
50	fn default() -> Self {
51		Self {
52			MaxBatchSize:100,
53			MaxBatchDelayMs:100,
54			// Compression threshold: 1KB (messages smaller than this won't be compressed)
55			CompressionThresholdBytes:1024,
56			CompressionLevel:CompressionLevel::Balanced,
57			Algorithm:CompressionAlgorithm::Brotli,
58		}
59	}
60}
61
62/// Message compressor with batching capabilities
63pub struct MessageCompressor {
64	Config:BatchConfig,
65	CurrentBatch:VecDeque<Vec<u8>>,
66	BatchStartTime:Option<Instant>,
67	BatchSizeBytes:usize,
68}
69
70impl MessageCompressor {
71	/// Create a new message compressor with configuration
72	pub fn new(config:BatchConfig) -> Self {
73		Self {
74			Config:config,
75			CurrentBatch:VecDeque::new(),
76			BatchStartTime:None,
77			BatchSizeBytes:0,
78		}
79	}
80
81	/// Add a message to the current batch
82	pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
83		let MessageSize = MessageData.len();
84		let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
85
86		// Check if we should flush based on size
87		if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
88			// The batch has reached its configured size limit and cannot accept more
89			// messages. Caller should flush the batch before adding additional messages.
90			return false;
91		}
92
93		// Add message to batch
94		self.CurrentBatch.push_back(MessageData.to_vec());
95		self.BatchSizeBytes += MessageSize;
96
97		// Initialize batch timer if this is the first message
98		if self.BatchStartTime.is_none() {
99			self.BatchStartTime = Some(Instant::now());
100		}
101
102		true
103	}
104
105	/// Check if batch should be flushed
106	pub fn should_flush(&self) -> bool {
107		if self.CurrentBatch.is_empty() {
108			return false;
109		}
110
111		// Check batch size limit
112		if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
113			return true;
114		}
115
116		// Check time limit
117		if let Some(start_time) = self.BatchStartTime {
118			let elapsed = start_time.elapsed();
119			if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
120				return true;
121			}
122		}
123
124		false
125	}
126
127	/// Compress and flush the current batch
128	pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
129		if self.CurrentBatch.is_empty() {
130			return Err("No messages in batch to flush".to_string());
131		}
132
133		let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
134		let total_size = self.BatchSizeBytes;
135
136		// Reset batch state
137		self.BatchStartTime = None;
138		self.BatchSizeBytes = 0;
139
140		// Serialize batch using bincode 2.0 API
141		let config = bincode::config::standard();
142		let serialized_batch =
143			encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
144
145		// Compress if needed
146		let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
147			self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
148		} else {
149			Ok((None, CompressionInfo::none()))
150		}?;
151
152		Ok(CompressedBatch {
153			messages_count:BatchMessages.len(),
154			original_size:total_size,
155			compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size) as usize,
156			compressed_data:CompressedData,
157			compression_info,
158			timestamp:std::time::SystemTime::now()
159				.duration_since(std::time::UNIX_EPOCH)
160				.unwrap_or_default()
161				.as_millis() as u64,
162		})
163	}
164
165	/// Compress data using configured algorithm
166	fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
167		match self.Config.Algorithm {
168			CompressionAlgorithm::Brotli => self.compress_brotli(data),
169			CompressionAlgorithm::Gzip => self.compress_gzip(data),
170			CompressionAlgorithm::Zlib => self.compress_zlib(data),
171		}
172	}
173
174	/// Compress using Brotli algorithm
175	fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
176		let mut params = BrotliEncoderParams::default();
177		params.quality = self.Config.CompressionLevel as i32;
178
179		let mut compressed = Vec::new();
180		{
181			let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), &params);
182			std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
183			writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
184		} // writer dropped here, release borrow on compressed
185
186		let ratio = data.len() as f64 / compressed.len() as f64;
187
188		Ok((
189			compressed,
190			CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
191		))
192	}
193
194	/// Compress using Gzip algorithm
195	fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
196		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
197		encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
198
199		let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
200
201		let ratio = data.len() as f64 / compressed.len() as f64;
202
203		Ok((
204			compressed,
205			CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
206		))
207	}
208
209	/// Compress using Zlib algorithm
210	fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
211		let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
212		encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
213
214		let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
215
216		let ratio = data.len() as f64 / compressed.len() as f64;
217
218		Ok((
219			compressed,
220			CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
221		))
222	}
223
224	/// Decompress a batch
225	pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
226		let data = if let Some(ref compressed_data) = batch.compressed_data {
227			self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
228		} else {
229			encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
230		};
231
232		let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
233			.map_err(|e| format!("Failed to deserialize batch: {}", e))?;
234		Ok(decoded)
235	}
236
237	/// Decompress data using specified algorithm
238	fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
239		match algorithm {
240			"brotli" => self.decompress_brotli(data),
241			"gzip" => self.decompress_gzip(data),
242			"zlib" => self.decompress_zlib(data),
243			_ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
244		}
245	}
246
247	/// Decompress Brotli data
248	fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
249		let mut decompressed = Vec::new();
250		let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
251
252		std::io::Read::read_to_end(&mut reader, &mut decompressed)
253			.map_err(|e| format!("Brotli decompression failed: {}", e))?;
254
255		Ok(decompressed)
256	}
257
258	/// Decompress Gzip data
259	fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
260		use flate2::read::GzDecoder;
261
262		let mut decoder = GzDecoder::new(data);
263		let mut decompressed = Vec::new();
264		decoder
265			.read_to_end(&mut decompressed)
266			.map_err(|e| format!("Gzip decompression failed: {}", e))?;
267
268		Ok(decompressed)
269	}
270
271	/// Decompress Zlib data
272	fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
273		use flate2::read::ZlibDecoder;
274
275		let mut decoder = ZlibDecoder::new(data);
276		let mut decompressed = Vec::new();
277		decoder
278			.read_to_end(&mut decompressed)
279			.map_err(|e| format!("Zlib decompression failed: {}", e))?;
280
281		Ok(decompressed)
282	}
283
284	/// Get current batch statistics
285	pub fn get_batch_stats(&self) -> BatchStats {
286		BatchStats {
287			messages_count:self.CurrentBatch.len(),
288			total_size_bytes:self.BatchSizeBytes,
289			batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
290		}
291	}
292
293	/// Clear current batch without flushing
294	pub fn clear_batch(&mut self) {
295		self.CurrentBatch.clear();
296		self.BatchStartTime = None;
297		self.BatchSizeBytes = 0;
298	}
299}
300
301/// Compressed batch structure
302#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct CompressedBatch {
304	pub messages_count:usize,
305	pub original_size:usize,
306	pub compressed_size:usize,
307	pub compressed_data:Option<Vec<u8>>,
308	pub compression_info:CompressionInfo,
309	pub timestamp:u64,
310}
311
312/// Compression information
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct CompressionInfo {
315	pub algorithm:String,
316	pub level:u32,
317	pub ratio:f64,
318}
319
320impl CompressionInfo {
321	fn none() -> Self { Self { algorithm:"none".to_string(), level:0, ratio:1.0 } }
322}
323
324/// Batch statistics
325#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct BatchStats {
327	pub messages_count:usize,
328	pub total_size_bytes:usize,
329	pub batch_age_ms:u64,
330}
331
332/// Utility functions for message compression
333impl MessageCompressor {
334	/// Compress a single message
335	pub fn compress_single_message(
336		message_data:&[u8],
337		algorithm:CompressionAlgorithm,
338		level:CompressionLevel,
339	) -> Result<(Vec<u8>, CompressionInfo), String> {
340		let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
341
342		let compressor = MessageCompressor::new(config);
343		compressor.compress_data(message_data)
344	}
345
346	/// Calculate compression ratio
347	pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
348		if compressed_size == 0 {
349			return 0.0;
350		}
351		original_size as f64 / compressed_size as f64
352	}
353
354	/// Estimate compression savings
355	pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
356		(original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
357	}
358}
359
360#[cfg(test)]
361mod tests {
362	use super::*;
363
364	#[test]
365	fn test_message_compression() {
366		let test_data = b"This is a test message for compression evaluation".to_vec();
367
368		// Test Brotli compression
369		let (compressed, info) = MessageCompressor::compress_single_message(
370			&test_data,
371			CompressionAlgorithm::Brotli,
372			CompressionLevel::Balanced,
373		)
374		.unwrap();
375
376		assert!(compressed.len() < test_data.len());
377		assert!(info.ratio > 1.0);
378		assert_eq!(info.algorithm, "brotli");
379
380		// Test Gzip compression
381		let (compressed_gzip, info_gzip) = MessageCompressor::compress_single_message(
382			&test_data,
383			CompressionAlgorithm::Gzip,
384			CompressionLevel::Balanced,
385		)
386		.unwrap();
387
388		assert!(compressed_gzip.len() < test_data.len());
389		assert!(info_gzip.ratio > 1.0);
390		assert_eq!(info_gzip.algorithm, "gzip");
391	}
392
393	#[test]
394	fn test_batch_compression() {
395		let config = BatchConfig::default();
396		let mut compressor = MessageCompressor::new(config);
397
398		let messages:Vec<Vec<u8>> = (0..5).map(|i| format!("Message {}", i).into_bytes()).collect();
399
400		for message in &messages {
401			compressor.add_message(message);
402		}
403
404		assert!(compressor.should_flush());
405
406		let batch = compressor.flush_batch().unwrap();
407		assert_eq!(batch.messages_count, 5);
408		assert!(batch.compressed_size <= batch.original_size);
409	}
410
411	#[test]
412	fn test_compression_ratio_calculation() {
413		let ratio = MessageCompressor::calculate_compression_ratio(1000, 500);
414		assert_eq!(ratio, 2.0);
415
416		let savings = MessageCompressor::estimate_savings(1000, 2.0);
417		assert_eq!(savings, 500);
418	}
419}