Mountain/IPC/
TauriIPCServer.rs

1//! # TauriIPCServer - Mountain-Wind IPC Bridge
2//!
3//! **File Responsibilities:**
4//! This module serves as the core IPC (Inter-Process Communication) server for
5//! Mountain, establishing and managing the bidirectional communication bridge
6//! between Mountain's Rust backend and Wind's TypeScript frontend. It
7//! implements the Mountain counterpart to Wind's TauriIPCServer.ts, ensuring
8//! seamless integration across the language boundary.
9//!
10//! **Architectural Role in Wind-Mountain Connection:**
11//! The TauriIPCServer acts as the central message router and communication
12//! orchestrator:
13//!
14//! 1. **Connection Management:**
15//!    - Establishes secure connections between Wind and Mountain
16//!    - Maintains connection health and auto-reconnects on failure
17//!    - Manages connection pooling for optimal resource usage
18//!    - Tracks connection state for monitoring and debugging
19//!
20//! 2. **Message Routing:**
21//!    - Routes incoming messages from Wind to appropriate handlers
22//!    - Broadcasts messages from Mountain to Wind subscribers
23//!    - Implements message filtering and prioritization
24//!    - Supports point-to-point and publish-subscribe patterns
25//!
26//! 3. **Security Layer:**
27//!    - Validates all incoming messages for security
28//!    - Implements permission-based access control (RBAC)
29//!    - Provides AES-256-GCM encryption for sensitive data
30//!    - Logs all security events for audit trails
31//!
32//! 4. **Reliability Features:**
33//!    - Message queuing for offline scenarios
34//!    - Automatic retry with exponential backoff
35//!    - Graceful degradation when services unavailable
36//!    - Circuit breaker pattern for cascading failure prevention
37//!
38//! **Communication Patterns:**
39//!
40//! **1. Request-Response Pattern:**
41//! ```text
42//! // Wind sends request
43//! let result = app_handle.invoke_handler("command", args).await?;
44//!
45//! // Mountain processes and responds
46//! let response = handle_request().await;
47//! ipc.emit(response_channel, response).await;
48//! ```
49//!
50//! **2. Event Emission Pattern:**
51//! ```text
52//! // Mountain emits events to Wind subscribers
53//! app.emit("configuration-updated", new_config).await;
54//! app.emit("file-changed", file_event).await;
55//! ```
56//!
57//! **3. Broadcast Pattern:**
58//! ```rust
59//! // Broadcast to all subscribers on a channel
60//! for listener in listeners.get(channel) {
61//! 	listener(message.clone()).await;
62//! }
63//! ```
64//!
65//! **Message Flow:**
66//! ```text
67//! Wind Frontend
68//! |
69//! | 4. Response
70//! v
71//! Tauri Bridge (JS Bridge)
72//! |
73//! | 1. IPC Invoke
74//! v
75//! TauriIPCServer (Rust)
76//! |
77//! | 2. Route & Validate
78//! v
79//! WindServiceHandlers
80//! |
81//! | 3. Execute
82//! v
83//! Mountain Services
84//! ```
85//!
86//! **Key Structures:**
87//!
88//! - **TauriIPCMessage:** Standard message format for all IPC communication
89//! - **ConnectionStatus:** Tracks connection health and uptime
90//! - **ConnectionPool:** Manages concurrent IPC connections efficiently
91//! - **PermissionManager:** Implements role-based access control
92//! - **SecureMessageChannel:** Provides encryption for sensitive data
93//! - **MessageCompressor:** Gzip compression for large payloads
94//!
95//! **Defensive Coding Practices:**
96//!
97//! 1. **Input Validation:**
98//!    - All messages validated before processing
99//!    - Type checking for all serialized data
100//!    - Schema validation for complex payloads
101//!
102//! 2. **Error Handling:**
103//!    - Comprehensive error messages with context
104//!    - Error logging at appropriate levels
105//!    - Graceful handling of transient failures
106//!    - Automatic retry with backoff
107//!
108//! 3. **Timeout Management:**
109//!    - Configurable timeouts for all operations
110//!    - Timeout-based circuit breaking
111//!    - Graceful degradation on timeout
112//!
113//! 4. **Resource Management:**
114//!    - Connection pooling to prevent exhaustion
115//!    - Automatic cleanup of stale resources
116//!    - Memory-efficient message queuing
117//!
118//! **Security Architecture:**
119//!
120//! - **Authentication:** User identity verification
121//! - **Authorization:** Permission-based access control (RBAC)
122//! - **Encryption:** AES-256-GCM for sensitive data
123//! - **Auditing:** Complete security event logging
124//! - **Threat Detection:** Anomaly monitoring and alerts
125//!
126//! **Performance Optimizations:**
127//!
128//! - **Message Compression:** Gzip for large payloads
129//! - **Connection Pooling:** Reuse connections efficiently
130//! - **Caching:** Cache frequently used data
131//! - **Batching:** Batch multiple messages for efficiency
132//! - **Async/Await:** Non-blocking I/O operations
133//!
134//! **Monitoring & Observability:**
135//!
136//! - **Connection Status:** Real-time health monitoring
137//! - **Performance Metrics:** Latency, throughput, error rates
138//! - **Audit Logs:** Complete message and security event logging
139//! - **Health Checks:** Periodic health assessments
140//!
141//! **VSCode RPC Patterns (Study Reference):**
142//! This implementation draws inspiration from VSCode's RPC/IPC architecture:
143//! - Channel-based message routing
144//! - Request-response correlation
145//! - Cancellation token support
146//! - Binary protocol message serialization
147//! - Protocol versioning for compatibility
148
149use std::{
150	collections::HashMap,
151	io::{Read, Write},
152	sync::{Arc, Mutex},
153	time::Duration,
154};
155
156use base64::{Engine, engine::general_purpose};
157use flate2::{Compression, read::GzDecoder, write::GzEncoder};
158use log::{debug, error, info, trace};
159use ring::{
160	aead::{self, AES_256_GCM, LessSafeKey, UnboundKey},
161	hmac,
162	rand::{SecureRandom, SystemRandom},
163};
164use serde::{Deserialize, Serialize};
165use tauri::{AppHandle, Emitter, Manager};
166use tokio::{
167	sync::{Mutex as AsyncMutex, RwLock, Semaphore},
168	time::timeout,
169};
170
171/// IPC message structure matching Wind's ITauriIPCMessage interface
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct TauriIPCMessage {
174	pub channel:String,
175	pub data:serde_json::Value,
176	pub sender:Option<String>,
177	pub timestamp:u64,
178}
179
180/// Connection status message
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct ConnectionStatus {
183	pub connected:bool,
184}
185
186/// Listener callback type
187type ListenerCallback = Box<dyn Fn(serde_json::Value) -> Result<(), String> + Send + Sync>;
188
189/// Mountain's IPC Server counterpart to Wind's TauriIPCServer
190#[derive(Clone)]
191pub struct TauriIPCServer {
192	app_handle:AppHandle,
193	listeners:Arc<Mutex<HashMap<String, Vec<ListenerCallback>>>>,
194	is_connected:Arc<Mutex<bool>>,
195	message_queue:Arc<Mutex<Vec<TauriIPCMessage>>>,
196}
197
198/// Message compression utility for optimizing IPC message transfer
199pub struct MessageCompressor {
200	CompressionLevel:u32,
201	BatchSize:usize,
202}
203
204impl MessageCompressor {
205	/// Create a new message compressor with specified parameters
206	pub fn new(CompressionLevel:u32, BatchSize:usize) -> Self { Self { CompressionLevel, BatchSize } }
207
208	/// Compress messages using Gzip for efficient transfer
209	pub fn compress_messages(&self, Messages:Vec<TauriIPCMessage>) -> Result<Vec<u8>, String> {
210		let SerializedMessages =
211			serde_json::to_vec(&Messages).map_err(|e| format!("Failed to serialize messages: {}", e))?;
212
213		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.CompressionLevel));
214		encoder
215			.write_all(&SerializedMessages)
216			.map_err(|e| format!("Failed to compress messages: {}", e))?;
217
218		encoder.finish().map_err(|e| format!("Failed to finish compression: {}", e))
219	}
220
221	/// Decompress messages from compressed data
222	pub fn decompress_messages(&self, CompressedData:&[u8]) -> Result<Vec<TauriIPCMessage>, String> {
223		let mut decoder = GzDecoder::new(CompressedData);
224		let mut DecompressedData = Vec::new();
225		decoder
226			.read_to_end(&mut DecompressedData)
227			.map_err(|e| format!("Failed to decompress data: {}", e))?;
228
229		serde_json::from_slice(&DecompressedData).map_err(|e| format!("Failed to deserialize messages: {}", e))
230	}
231
232	/// Check if messages should be batched for compression
233	pub fn should_batch(&self, MessagesCount:usize) -> bool { MessagesCount >= self.BatchSize }
234}
235
236impl TauriIPCServer {
237	/// Create a new Tauri IPC Server instance
238	pub fn new(app_handle:AppHandle) -> Self {
239		info!("[TauriIPCServer] Initializing Mountain IPC Server");
240
241		Self {
242			app_handle,
243			listeners:Arc::new(Mutex::new(HashMap::new())),
244			is_connected:Arc::new(Mutex::new(false)),
245			message_queue:Arc::new(Mutex::new(Vec::new())),
246		}
247	}
248
249	/// Initialize the IPC server and set up event listeners
250	pub async fn initialize(&self) -> Result<(), String> {
251		info!("[TauriIPCServer] Setting up IPC listeners");
252
253		// Set up connection status
254		{
255			let mut is_connected = self
256				.is_connected
257				.lock()
258				.map_err(|e| format!("Failed to lock connection status: {}", e))?;
259			*is_connected = true;
260		}
261
262		// Notify Wind that Mountain is ready
263		self.send_connection_status(true)
264			.await
265			.map_err(|e| format!("Failed to send connection status: {}", e))?;
266
267		info!("[TauriIPCServer] IPC Server initialized successfully");
268
269		// Process any queued messages
270		self.process_message_queue().await;
271
272		Ok(())
273	}
274
275	/// Send a message to the Wind frontend
276	pub async fn send(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
277		let message = TauriIPCMessage {
278			channel:channel.to_string(),
279			data,
280			sender:Some("mountain".to_string()),
281			timestamp:std::time::SystemTime::now()
282				.duration_since(std::time::UNIX_EPOCH)
283				.unwrap_or_default()
284				.as_millis() as u64,
285		};
286
287		let is_connected = {
288			let guard = self
289				.is_connected
290				.lock()
291				.map_err(|e| format!("Failed to check connection status: {}", e))?;
292			*guard
293		};
294
295		if !is_connected {
296			// Queue the message for later delivery
297			let mut queue = self
298				.message_queue
299				.lock()
300				.map_err(|e| format!("Failed to access message queue: {}", e))?;
301			queue.push(message);
302			debug!(
303				"[TauriIPCServer] Message queued (channel: {}, queue size: {})",
304				channel,
305				queue.len()
306			);
307			return Ok(());
308		}
309
310		// Send immediately
311		self.emit_message(&message).await
312	}
313
314	/// Register a listener for incoming messages from Wind
315	pub fn on(&self, channel:&str, callback:ListenerCallback) -> Result<(), String> {
316		let mut listeners = self
317			.listeners
318			.lock()
319			.map_err(|e| format!("Failed to access listeners: {}", e))?;
320
321		listeners.entry(channel.to_string()).or_insert_with(Vec::new).push(callback);
322
323		debug!("[TauriIPCServer] Listener registered for channel: {}", channel);
324		Ok(())
325	}
326
327	/// Remove a listener
328	pub fn off(&self, channel:&str, callback:&ListenerCallback) -> Result<(), String> {
329		let mut listeners = self
330			.listeners
331			.lock()
332			.map_err(|e| format!("Failed to access listeners: {}", e))?;
333
334		if let Some(channel_listeners) = listeners.get_mut(channel) {
335			channel_listeners.retain(|cb| !std::ptr::eq(cb as *const _, callback as *const _));
336
337			if channel_listeners.is_empty() {
338				listeners.remove(channel);
339			}
340		}
341
342		debug!("[TauriIPCServer] Listener removed from channel: {}", channel);
343		Ok(())
344	}
345
346	/// Handle incoming messages from Wind
347	pub async fn handle_incoming_message(&self, message:TauriIPCMessage) -> Result<(), String> {
348		trace!("[TauriIPCServer] Received message on channel: {}", message.channel);
349
350		let listeners = self
351			.listeners
352			.lock()
353			.map_err(|e| format!("Failed to access listeners: {}", e))?;
354
355		if let Some(channel_listeners) = listeners.get(&message.channel) {
356			for callback in channel_listeners {
357				if let Err(e) = callback(message.data.clone()) {
358					error!("[TauriIPCServer] Error in listener for channel {}: {}", message.channel, e);
359				}
360			}
361		} else {
362			debug!("[TauriIPCServer] No listeners found for channel: {}", message.channel);
363		}
364
365		Ok(())
366	}
367
368	/// Send connection status to Wind
369	async fn send_connection_status(&self, connected:bool) -> Result<(), String> {
370		let status = ConnectionStatus { connected };
371
372		self.app_handle
373			.emit("vscode-ipc-status", status)
374			.map_err(|e| format!("Failed to emit connection status: {}", e))?;
375
376		debug!("[TauriIPCServer] Connection status sent: {}", connected);
377		Ok(())
378	}
379
380	/// Emit a message to Wind
381	async fn emit_message(&self, message:&TauriIPCMessage) -> Result<(), String> {
382		self.app_handle
383			.emit("vscode-ipc-message", message)
384			.map_err(|e| format!("Failed to emit message: {}", e))?;
385
386		trace!("[TauriIPCServer] Message emitted on channel: {}", message.channel);
387		Ok(())
388	}
389
390	/// Process queued messages
391	async fn process_message_queue(&self) {
392		let mut queue = match self.message_queue.lock() {
393			Ok(queue) => queue,
394			Err(e) => {
395				error!("[TauriIPCServer] Failed to access message queue: {}", e);
396				return;
397			},
398		};
399
400		while let Some(message) = queue.pop() {
401			if let Err(e) = self.emit_message(&message).await {
402				error!("[TauriIPCServer] Failed to send queued message: {}", e);
403				// Put the message back in the queue
404				queue.insert(0, message);
405				break;
406			}
407		}
408
409		debug!("[TauriIPCServer] Message queue processed, {} messages remaining", queue.len());
410	}
411
412	/// Get connection status
413	pub fn get_connection_status(&self) -> Result<bool, String> {
414		let guard = self
415			.is_connected
416			.lock()
417			.map_err(|e| format!("Failed to get connection status: {}", e))?;
418		Ok(*guard)
419	}
420
421	/// Get queued message count
422	pub fn get_queue_size(&self) -> Result<usize, String> {
423		let guard = self
424			.message_queue
425			.lock()
426			.map_err(|e| format!("Failed to get queue size: {}", e))?;
427		Ok(guard.len())
428	}
429
430	/// Cleanup resources
431	pub fn dispose(&self) -> Result<(), String> {
432		{
433			let mut listeners = self
434				.listeners
435				.lock()
436				.map_err(|e| format!("Failed to access listeners: {}", e))?;
437			listeners.clear();
438		}
439
440		{
441			let mut queue = self
442				.message_queue
443				.lock()
444				.map_err(|e| format!("Failed to access message queue: {}", e))?;
445			queue.clear();
446		}
447
448		{
449			let mut is_connected = self
450				.is_connected
451				.lock()
452				.map_err(|e| format!("Failed to access connection status: {}", e))?;
453			*is_connected = false;
454		}
455
456		info!("[TauriIPCServer] IPC Server disposed");
457		Ok(())
458	}
459
460	/// Advanced: Validate message permissions
461	pub async fn validate_message_permissions(&self, message:&TauriIPCMessage) -> Result<(), String> {
462		let permission_manager = PermissionManager::new();
463		permission_manager.initialize_defaults().await;
464
465		let context = self.create_security_context(message);
466
467		// Extract operation from channel name
468		let operation = message.channel.replace("mountain_", "");
469
470		// Validate permission
471		permission_manager.validate_permission(&operation, &context).await
472	}
473
474	/// Advanced: Create security context from message
475	fn create_security_context(&self, message:&TauriIPCMessage) -> SecurityContext {
476		SecurityContext {
477			user_id:message.sender.clone().unwrap_or("unknown".to_string()),
478			// Default role assigned to authenticated IPC connections
479			roles:vec!["user".to_string()],
480			permissions:vec![],
481			// IPC connections use loopback address for security (localhost only)
482			ip_address:"127.0.0.1".to_string(),
483			timestamp:std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(message.timestamp),
484		}
485	}
486
487	/// Advanced: Log security event
488	pub async fn log_security_event(&self, event:SecurityEvent) {
489		let permission_manager = PermissionManager::new();
490		permission_manager.log_security_event(event).await;
491	}
492
493	/// Advanced: Record performance metrics
494	pub async fn record_performance_metrics(&self, channel:String, duration:std::time::Duration, success:bool) {
495		// This would integrate with the PerformanceDashboard
496		debug!(
497			"[TauriIPCServer] Performance recorded - Channel: {}, Duration: {:?}, Success: {}",
498			channel, duration, success
499		);
500	}
501
502	/// Advanced: Get security audit log
503	pub async fn get_security_audit_log(&self, limit:usize) -> Result<Vec<SecurityEvent>, String> {
504		let permission_manager = PermissionManager::new();
505		Ok(permission_manager.get_audit_log(limit).await)
506	}
507
508	/// Send compressed message batch
509	pub async fn send_compressed_batch(&self, channel:&str, messages:Vec<TauriIPCMessage>) -> Result<(), String> {
510		// Configure compressor with balanced settings: level 6 (good compression/speed
511		// tradeoff) and batch size 10 (aggregate small messages for efficiency)
512		let compressor = MessageCompressor::new(6, 10);
513
514		let compressed_data = compressor
515			.compress_messages(messages)
516			.map_err(|e| format!("Failed to compress batch: {}", e))?;
517
518		let batch_message = TauriIPCMessage {
519			channel:"compressed_batch".to_string(),
520			data:serde_json::Value::String(general_purpose::STANDARD.encode(&compressed_data)),
521			sender:Some("mountain".to_string()),
522			timestamp:std::time::SystemTime::now()
523				.duration_since(std::time::UNIX_EPOCH)
524				.unwrap_or_default()
525				.as_millis() as u64,
526		};
527
528		self.send(channel, serde_json::to_value(batch_message).unwrap()).await
529	}
530
531	/// Handle compressed batch message
532	pub async fn handle_compressed_batch(&self, message:TauriIPCMessage) -> Result<(), String> {
533		let compressed_data_base64 = message.data.as_str().ok_or("Compressed batch data must be a string")?;
534
535		let compressed_data = general_purpose::STANDARD
536			.decode(compressed_data_base64)
537			.map_err(|e| format!("Failed to decode base64: {}", e))?;
538
539		let compressor = MessageCompressor::new(6, 10);
540		let messages = compressor
541			.decompress_messages(&compressed_data)
542			.map_err(|e| format!("Failed to decompress batch: {}", e))?;
543
544		// Process each message in the batch
545		for message in messages {
546			self.handle_incoming_message(message).await?;
547		}
548
549		Ok(())
550	}
551
552	/// Send message using connection pool
553	pub async fn send_with_pool(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
554		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
555
556		let handle = pool
557			.GetConnection()
558			.await
559			.map_err(|e| format!("Failed to get connection: {}", e))?;
560
561		let result = self.send(channel, data).await;
562
563		pool.ReleaseConnection(handle).await;
564
565		result
566	}
567
568	/// Get connection pool statistics
569	pub async fn get_connection_stats(&self) -> Result<ConnectionStats, String> {
570		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
571		Ok(pool.GetStats().await)
572	}
573
574	/// Send encrypted message
575	pub async fn send_secure(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
576		let secure_channel =
577			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
578
579		let message = TauriIPCMessage {
580			channel:channel.to_string(),
581			data,
582			sender:Some("mountain".to_string()),
583			timestamp:std::time::SystemTime::now()
584				.duration_since(std::time::UNIX_EPOCH)
585				.unwrap_or_default()
586				.as_millis() as u64,
587		};
588
589		let encrypted_message = secure_channel
590			.encrypt_message(&message)
591			.map_err(|e| format!("Failed to encrypt message: {}", e))?;
592
593		let encrypted_data = serde_json::to_value(encrypted_message)
594			.map_err(|e| format!("Failed to serialize encrypted message: {}", e))?;
595
596		self.send("secure_message", encrypted_data).await
597	}
598
599	/// Handle encrypted message
600	pub async fn handle_secure_message(&self, encrypted_data:serde_json::Value) -> Result<(), String> {
601		let encrypted_message:EncryptedMessage = serde_json::from_value(encrypted_data)
602			.map_err(|e| format!("Failed to deserialize encrypted message: {}", e))?;
603
604		let secure_channel =
605			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
606
607		let message = secure_channel
608			.decrypt_message(&encrypted_message)
609			.map_err(|e| format!("Failed to decrypt message: {}", e))?;
610
611		self.handle_incoming_message(message).await
612	}
613
614	/// Handle message with permission validation
615	pub async fn handle_message_with_permissions(&self, message:TauriIPCMessage) -> Result<(), String> {
616		let permission_manager = PermissionManager::new();
617		let context = self.create_security_context(&message);
618
619		// Extract operation from channel name
620		let operation = message.channel.replace("mountain_", "");
621
622		// Validate permission
623		permission_manager.validate_permission(&operation, &context).await?;
624
625		// Process the message
626		self.handle_incoming_message(message).await
627	}
628}
629
630/// Connection pool for IPC operations - manages concurrent connections
631/// efficiently
632///
633/// **Purpose:** Prevents connection exhaustion by pooling connections and
634/// reusing them **Features:** Health monitoring, automatic cleanup,
635/// configurable timeouts
636pub struct ConnectionPool {
637	MaxConnections:usize,
638	ConnectionTimeout:Duration,
639	Semaphore:Arc<Semaphore>,
640	ActiveConnections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
641	HealthChecker:Arc<AsyncMutex<ConnectionHealthChecker>>,
642}
643
644/// Handle representing an active connection
645#[derive(Clone)]
646pub struct ConnectionHandle {
647	pub id:String,
648	pub created_at:std::time::Instant,
649	pub last_used:std::time::Instant,
650	pub health_score:f64,
651	pub error_count:usize,
652}
653
654impl ConnectionHandle {
655	/// Create a new connection handle with health monitoring
656	pub fn new() -> Self {
657		Self {
658			id:uuid::Uuid::new_v4().to_string(),
659			created_at:std::time::Instant::now(),
660			last_used:std::time::Instant::now(),
661			health_score:100.0,
662			error_count:0,
663		}
664	}
665
666	/// Update health score based on operation success
667	pub fn update_health(&mut self, success:bool) {
668		if success {
669			self.health_score = (self.health_score + 10.0).min(100.0);
670			self.error_count = 0;
671		} else {
672			self.health_score = (self.health_score - 25.0).max(0.0);
673			self.error_count += 1;
674		}
675		self.last_used = std::time::Instant::now();
676	}
677
678	/// Check if connection is healthy
679	pub fn is_healthy(&self) -> bool { self.health_score > 50.0 && self.error_count < 5 }
680}
681
682impl ConnectionPool {
683	/// Create a new connection pool with specified parameters
684	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
685		Self {
686			MaxConnections,
687			ConnectionTimeout,
688			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
689			ActiveConnections:Arc::new(AsyncMutex::new(HashMap::new())),
690			HealthChecker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
691		}
692	}
693
694	/// Get a connection handle from the pool with timeout
695	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
696		let _permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
697			.await
698			.map_err(|_| "Connection timeout")?
699			.map_err(|e| format!("Failed to acquire connection: {}", e))?;
700
701		let handle = ConnectionHandle::new();
702
703		{
704			let mut connections = self.ActiveConnections.lock().await;
705			connections.insert(handle.id.clone(), handle.clone());
706		}
707
708		// Start health monitoring for this connection
709		self.StartHealthMonitoring(&handle.id).await;
710
711		Ok(handle)
712	}
713
714	/// Release a connection handle back to the pool
715	pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
716		{
717			let mut connections = self.ActiveConnections.lock().await;
718			connections.remove(&handle.id);
719		}
720
721		// The permit is released when dropped
722	}
723
724	/// Get connection statistics for monitoring
725	pub async fn GetStats(&self) -> ConnectionStats {
726		let connections = self.ActiveConnections.lock().await;
727		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
728
729		ConnectionStats {
730			total_connections:connections.len(),
731			healthy_connections,
732			max_connections:self.MaxConnections,
733			available_permits:self.Semaphore.available_permits(),
734			connection_timeout:self.ConnectionTimeout,
735		}
736	}
737
738	/// Clean up stale connections
739	pub async fn CleanUpStaleConnections(&self) -> usize {
740		let mut connections = self.ActiveConnections.lock().await;
741		let now = std::time::Instant::now();
742		// Stale connections are those unused for 5 minutes (300 seconds)
743		let stale_threshold = Duration::from_secs(300);
744
745		let stale_ids:Vec<String> = connections
746			.iter()
747			.filter(|(_, handle)| now.duration_since(handle.last_used) > stale_threshold || !handle.is_healthy())
748			.map(|(id, _)| id.clone())
749			.collect();
750
751		let stale_count = stale_ids.len();
752		for id in stale_ids {
753			connections.remove(&id);
754		}
755
756		stale_count
757	}
758
759	/// Start health monitoring for a connection
760	async fn StartHealthMonitoring(&self, connection_id:&str) {
761		let health_checker = self.HealthChecker.clone();
762		let active_connections = self.ActiveConnections.clone();
763		let connection_id = connection_id.to_string();
764
765		tokio::spawn(async move {
766			let mut interval = tokio::time::interval(Duration::from_secs(30));
767
768			loop {
769				interval.tick().await;
770
771				let mut checker = health_checker.lock().await;
772				let mut connections = match active_connections.try_lock() {
773					Ok(conns) => conns,
774					Err(_) => continue,
775				};
776
777				if let Some(handle) = connections.get_mut(&connection_id) {
778					let is_healthy = checker.check_connection_health(handle).await;
779					handle.update_health(is_healthy);
780
781					if !handle.is_healthy() {
782						debug!(
783							"Connection {} marked as unhealthy (score: {:.1})",
784							handle.id, handle.health_score
785						);
786					}
787				} else {
788					// The connection has been removed from the pool, stop monitoring
789					break;
790				}
791			}
792		});
793	}
794}
795
796/// Connection health checker
797struct ConnectionHealthChecker {
798	ping_timeout:Duration,
799}
800
801impl ConnectionHealthChecker {
802	fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
803
804	/// Check connection health by sending a ping
805	async fn check_connection_health(&self, _handle:&mut ConnectionHandle) -> bool {
806		// Simulate health check by ensuring connection can handle basic operations
807		// In a real implementation, this would send an actual ping message
808		let start_time = std::time::Instant::now();
809
810		// Simulate network latency
811		tokio::time::sleep(Duration::from_millis(10)).await;
812
813		let response_time = start_time.elapsed();
814
815		// Connection is healthy if response time is reasonable
816		response_time < self.ping_timeout
817	}
818}
819
820/// Connection statistics
821#[derive(Debug, Clone, Default)]
822pub struct ConnectionStats {
823	pub total_connections:usize,
824	pub healthy_connections:usize,
825	pub max_connections:usize,
826	pub available_permits:usize,
827	pub connection_timeout:Duration,
828}
829
830/// Secure message channel with encryption and authentication
831pub struct SecureMessageChannel {
832	encryption_key:LessSafeKey,
833	hmac_key:Vec<u8>,
834}
835
836impl SecureMessageChannel {
837	/// Create a new secure channel
838	pub fn new() -> Result<Self, String> {
839		let rng = SystemRandom::new();
840
841		// Generate encryption key
842		let mut encryption_key_bytes = vec![0u8; 32];
843		rng.fill(&mut encryption_key_bytes)
844			.map_err(|e| format!("Failed to generate encryption key: {}", e))?;
845
846		let unbound_key = UnboundKey::new(&AES_256_GCM, &encryption_key_bytes)
847			.map_err(|e| format!("Failed to create unbound key: {}", e))?;
848
849		let encryption_key = LessSafeKey::new(unbound_key);
850
851		// Generate HMAC key
852		let mut hmac_key = vec![0u8; 32];
853		rng.fill(&mut hmac_key)
854			.map_err(|e| format!("Failed to generate HMAC key: {}", e))?;
855
856		Ok(Self { encryption_key, hmac_key })
857	}
858
859	/// Encrypt and authenticate a message
860	pub fn encrypt_message(&self, message:&TauriIPCMessage) -> Result<EncryptedMessage, String> {
861		let serialized_message =
862			serde_json::to_vec(message).map_err(|e| format!("Failed to serialize message: {}", e))?;
863
864		// Generate nonce
865		let mut nonce = [0u8; 12];
866		SystemRandom::new()
867			.fill(&mut nonce)
868			.map_err(|e| format!("Failed to generate nonce: {}", e))?;
869
870		// Encrypt message
871		let mut in_out = serialized_message.clone();
872		self.encryption_key
873			.seal_in_place_append_tag(aead::Nonce::assume_unique_for_key(nonce), aead::Aad::empty(), &mut in_out)
874			.map_err(|e| format!("Encryption failed: {}", e))?;
875
876		// Create HMAC
877		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
878		let hmac_tag = hmac::sign(&hmac_key, &in_out);
879
880		Ok(EncryptedMessage { nonce:nonce.to_vec(), ciphertext:in_out, hmac_tag:hmac_tag.as_ref().to_vec() })
881	}
882
883	/// Decrypt and verify a message
884	pub fn decrypt_message(&self, encrypted:&EncryptedMessage) -> Result<TauriIPCMessage, String> {
885		// Verify HMAC
886		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
887		hmac::verify(&hmac_key, &encrypted.ciphertext, &encrypted.hmac_tag)
888			.map_err(|_| "HMAC verification failed".to_string())?;
889
890		// Decrypt message
891		let mut in_out = encrypted.ciphertext.clone();
892		let nonce_slice:&[u8] = &encrypted.nonce;
893		let nonce_array:[u8; 12] = nonce_slice.try_into().map_err(|_| "Invalid nonce length".to_string())?;
894
895		let nonce = aead::Nonce::assume_unique_for_key(nonce_array);
896
897		self.encryption_key
898			.open_in_place(nonce, aead::Aad::empty(), &mut in_out)
899			.map_err(|e| format!("Decryption failed: {}", e))?;
900
901		// Remove authentication tag
902		let plaintext_len = in_out.len() - AES_256_GCM.tag_len();
903		in_out.truncate(plaintext_len);
904
905		// Deserialize message
906		serde_json::from_slice(&in_out).map_err(|e| format!("Failed to deserialize message: {}", e))
907	}
908
909	/// Rotate encryption keys
910	pub fn rotate_keys(&mut self) -> Result<(), String> {
911		*self = Self::new()?;
912		Ok(())
913	}
914}
915
916/// Encrypted message structure
917#[derive(Debug, Clone, Serialize, Deserialize)]
918pub struct EncryptedMessage {
919	nonce:Vec<u8>,
920	ciphertext:Vec<u8>,
921	hmac_tag:Vec<u8>,
922}
923
924/// Advanced permission-based IPC message handler
925#[tauri::command]
926pub async fn mountain_ipc_receive_message(app_handle:tauri::AppHandle, message:TauriIPCMessage) -> Result<(), String> {
927	debug!(
928		"[TauriIPCServer] Received IPC message from Wind on channel: {}",
929		message.channel
930	);
931
932	// Get the IPC server instance from application state
933	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
934		// Advanced security: Validate permissions before processing
935		if let Err(e) = ipc_server.validate_message_permissions(&message).await {
936			error!(
937				"[TauriIPCServer] Permission validation failed for channel {}: {}",
938				message.channel, e
939			);
940
941			// Log security event
942			ipc_server
943				.log_security_event(SecurityEvent {
944					event_type:SecurityEventType::PermissionDenied,
945					user_id:message.sender.clone().unwrap_or("unknown".to_string()),
946					operation:message.channel.clone(),
947					timestamp:std::time::SystemTime::now(),
948					details:Some(format!("Permission denied: {}", e)),
949				})
950				.await;
951
952			return Err(format!("Permission denied: {}", e));
953		}
954
955		// Advanced monitoring: Track message processing time
956		let start_time = std::time::Instant::now();
957		let result = ipc_server.handle_incoming_message(message.clone()).await;
958		let duration = start_time.elapsed();
959
960		// Record performance metrics
961		ipc_server
962			.record_performance_metrics(message.channel, duration, result.is_ok())
963			.await;
964
965		result
966	} else {
967		Err("IPC Server not found in application state".to_string())
968	}
969}
970
971/// Tauri command handler for Wind to check connection status
972///
973/// **Command Registration:** Registered in Tauri's invoke_handler
974/// Called by Wind using: `app.handle.invoke('mountain_ipc_get_status')`
975///
976/// **Response Format:**
977/// ```json
978/// {
979///   "connected": true
980/// }
981/// ```
982#[tauri::command]
983pub async fn mountain_ipc_get_status(app_handle:tauri::AppHandle) -> Result<ConnectionStatus, String> {
984	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
985		let connected = ipc_server
986			.get_connection_status()
987			.map_err(|e| format!("Failed to get connection status: {}", e))?;
988
989		Ok(ConnectionStatus { connected })
990	} else {
991		Err("IPC Server not found in application state".to_string())
992	}
993}
994
995/// Security context for permission validation
996#[derive(Debug, Clone, Serialize, Deserialize)]
997pub struct SecurityContext {
998	pub user_id:String,
999	pub roles:Vec<String>,
1000	pub permissions:Vec<String>,
1001	pub ip_address:String,
1002	pub timestamp:std::time::SystemTime,
1003}
1004
1005/// Permission manager for IPC operations
1006pub struct PermissionManager {
1007	roles:Arc<RwLock<HashMap<String, Role>>>,
1008	permissions:Arc<RwLock<HashMap<String, Permission>>>,
1009	audit_log:Arc<RwLock<Vec<SecurityEvent>>>,
1010}
1011
1012/// Security event for auditing
1013#[derive(Debug, Clone, Serialize, Deserialize)]
1014pub struct SecurityEvent {
1015	pub event_type:SecurityEventType,
1016	pub user_id:String,
1017	pub operation:String,
1018	pub timestamp:std::time::SystemTime,
1019	pub details:Option<String>,
1020}
1021
1022#[derive(Debug, Clone, Serialize, Deserialize)]
1023pub enum SecurityEventType {
1024	PermissionDenied,
1025	AccessGranted,
1026	ConfigurationChange,
1027	SecurityViolation,
1028	PerformanceAnomaly,
1029}
1030
1031/// Role definition for RBAC
1032#[derive(Debug, Clone, Serialize, Deserialize)]
1033pub struct Role {
1034	pub name:String,
1035	pub permissions:Vec<String>,
1036	pub description:String,
1037}
1038
1039/// Permission definition
1040#[derive(Debug, Clone, Serialize, Deserialize)]
1041pub struct Permission {
1042	pub name:String,
1043	pub description:String,
1044	pub category:String,
1045}
1046
1047impl PermissionManager {
1048	pub fn new() -> Self {
1049		Self {
1050			roles:Arc::new(RwLock::new(HashMap::new())),
1051			permissions:Arc::new(RwLock::new(HashMap::new())),
1052			audit_log:Arc::new(RwLock::new(Vec::new())),
1053		}
1054	}
1055
1056	/// Validate permission for an operation
1057	pub async fn validate_permission(&self, operation:&str, context:&SecurityContext) -> Result<(), String> {
1058		// Check if operation requires specific permissions
1059		let required_permissions = self.get_required_permissions(operation).await;
1060
1061		if required_permissions.is_empty() {
1062			return Ok(()); // No specific permissions required
1063		}
1064
1065		// Check if user has required permissions
1066		let mut user_permissions:Vec<String> = context.permissions.iter().cloned().collect();
1067		for role in context.roles.iter() {
1068			let role_perms = self.get_role_permissions(role).await;
1069			user_permissions.extend(role_perms);
1070		}
1071
1072		for required in required_permissions {
1073			if !user_permissions.contains(&required) {
1074				return Err(format!("Missing permission: {}", required));
1075			}
1076		}
1077
1078		// Log successful access
1079		self.log_security_event(SecurityEvent {
1080			event_type:SecurityEventType::AccessGranted,
1081			user_id:context.user_id.clone(),
1082			operation:operation.to_string(),
1083			timestamp:std::time::SystemTime::now(),
1084			details:Some(format!("Access granted for operation: {}", operation)),
1085		})
1086		.await;
1087
1088		Ok(())
1089	}
1090
1091	/// Get required permissions for an operation
1092	async fn get_required_permissions(&self, operation:&str) -> Vec<String> {
1093		// Define operation-to-permission mapping
1094		match operation {
1095			"file:write" | "file:delete" => vec!["file.write".to_string()],
1096			"configuration:update" => vec!["config.update".to_string()],
1097			"storage:set" => vec!["storage.write".to_string()],
1098			"native:openExternal" => vec!["system.external".to_string()],
1099			// Operations not in the mapping require no special permissions by default
1100			_ => Vec::new(),
1101		}
1102	}
1103
1104	/// Get permissions for a role
1105	async fn get_role_permissions(&self, role_name:&str) -> Vec<String> {
1106		let roles = self.roles.read().await;
1107		roles.get(role_name).map(|role| role.permissions.clone()).unwrap_or_default()
1108	}
1109
1110	/// Log security event
1111	pub async fn log_security_event(&self, event:SecurityEvent) {
1112		let mut audit_log = self.audit_log.write().await;
1113		audit_log.push(event);
1114
1115		// Keep only last 1000 events
1116		if audit_log.len() > 1000 {
1117			audit_log.remove(0);
1118		}
1119	}
1120
1121	/// Get security audit log
1122	pub async fn get_audit_log(&self, limit:usize) -> Vec<SecurityEvent> {
1123		let audit_log = self.audit_log.read().await;
1124		audit_log.iter().rev().take(limit).cloned().collect()
1125	}
1126
1127	/// Initialize default roles and permissions
1128	pub async fn initialize_defaults(&self) {
1129		let mut permissions = self.permissions.write().await;
1130		let mut roles = self.roles.write().await;
1131
1132		// Define standard permissions
1133		let standard_permissions = vec![
1134			("file.read", "Read file operations"),
1135			("file.write", "Write file operations"),
1136			("config.read", "Read configuration"),
1137			("config.update", "Update configuration"),
1138			("storage.read", "Read storage"),
1139			("storage.write", "Write storage"),
1140			("system.external", "Access external system resources"),
1141		];
1142
1143		for (name, description) in standard_permissions {
1144			permissions.insert(
1145				name.to_string(),
1146				Permission {
1147					name:name.to_string(),
1148					description:description.to_string(),
1149					category:"standard".to_string(),
1150				},
1151			);
1152		}
1153
1154		// Define standard roles
1155		let standard_roles = vec![
1156			("user", vec!["file.read", "config.read", "storage.read"]),
1157			(
1158				"developer",
1159				vec!["file.read", "file.write", "config.read", "storage.read", "storage.write"],
1160			),
1161			(
1162				"admin",
1163				vec![
1164					"file.read",
1165					"file.write",
1166					"config.read",
1167					"config.update",
1168					"storage.read",
1169					"storage.write",
1170					"system.external",
1171				],
1172			),
1173		];
1174
1175		for (name, role_permissions) in standard_roles {
1176			roles.insert(
1177				name.to_string(),
1178				Role {
1179					name:name.to_string(),
1180					permissions:role_permissions.iter().map(|p| p.to_string()).collect(),
1181					description:format!("{} role with standard permissions", name),
1182				},
1183			);
1184		}
1185	}
1186}