grove/Transport/
Strategy.rs

1//! Transport Strategy Module
2//!
3//! Defines the transport strategy trait and types for different
4//! communication methods (gRPC, IPC, WASM).
5
6use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{TransportConfig, GrpcTransport, IPCTransportImpl, WASMTransportImpl};
13
14/// Transport strategy trait
15///
16/// All transport implementations must implement this trait to provide
17/// a common interface for connecting, sending, and closing connections.
18#[async_trait]
19pub trait TransportStrategy: Send + Sync {
20	/// Error type for this transport
21	type Error: std::error::Error + Send + Sync + 'static;
22
23	/// Connect to the transport endpoint
24	async fn connect(&self) -> Result<(), Self::Error>;
25
26	/// Send a request and receive a response
27	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
28
29	/// Send data without expecting a response (fire and forget)
30	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
31
32	/// Close the transport connection
33	async fn close(&self) -> Result<(), Self::Error>;
34
35	/// Check if the transport is connected
36	fn is_connected(&self) -> bool;
37
38	/// Get the transport type identifier
39	fn transport_type(&self) -> TransportType;
40}
41
42/// Transport type enumeration
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum TransportType {
45	/// gRPC transport
46	gRPC,
47	/// Inter-process communication
48	IPC,
49	/// Direct WASM module communication
50	WASM,
51	/// Unknown/unspecified transport
52	Unknown,
53}
54
55impl fmt::Display for TransportType {
56	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
57		match self {
58			Self::gRPC => write!(f, "grpc"),
59			Self::IPC => write!(f, "ipc"),
60			Self::WASM => write!(f, "wasm"),
61			Self::Unknown => write!(f, "unknown"),
62		}
63	}
64}
65
66impl std::str::FromStr for TransportType {
67	type Err = anyhow::Error;
68
69	fn from_str(s:&str) -> Result<Self, Self::Err> {
70		match s.to_lowercase().as_str() {
71			"grpc" => Ok(Self::gRPC),
72			"ipc" => Ok(Self::IPC),
73			"wasm" => Ok(Self::WASM),
74			_ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
75		}
76	}
77}
78
79/// Transport enumeration
80///
81/// Union type for all supported transport implementations.
82#[derive(Debug)]
83pub enum Transport {
84	/// gRPC-based transport
85	gRPC(GrpcTransport),
86	/// Inter-process communication transport
87	IPC(IPCTransportImpl),
88	/// Direct WASM module transport
89	WASM(WASMTransportImpl),
90}
91
92impl Transport {
93	/// Get the transport type
94	pub fn transport_type(&self) -> TransportType {
95		match self {
96			Self::gRPC(_) => TransportType::gRPC,
97			Self::IPC(_) => TransportType::IPC,
98			Self::WASM(_) => TransportType::WASM,
99		}
100	}
101
102	/// Connect to the transport
103	pub async fn connect(&self) -> anyhow::Result<()> {
104		match self {
105			Self::gRPC(transport) => {
106				transport
107					.connect()
108					.await
109					.map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
110			},
111			Self::IPC(transport) => {
112				transport
113					.connect()
114					.await
115					.map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
116			},
117			Self::WASM(transport) => {
118				transport
119					.connect()
120					.await
121					.map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
122			},
123		}
124	}
125
126	/// Send a request and receive a response
127	pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
128		match self {
129			Self::gRPC(transport) => {
130				transport
131					.send(request)
132					.await
133					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
134			},
135			Self::IPC(transport) => {
136				transport
137					.send(request)
138					.await
139					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
140			},
141			Self::WASM(transport) => {
142				transport
143					.send(request)
144					.await
145					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
146			},
147		}
148	}
149
150	/// Send data without expecting a response
151	pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
152		match self {
153			Self::gRPC(transport) => {
154				transport
155					.send_no_response(data)
156					.await
157					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
158			},
159			Self::IPC(transport) => {
160				transport
161					.send_no_response(data)
162					.await
163					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
164			},
165			Self::WASM(transport) => {
166				transport
167					.send_no_response(data)
168					.await
169					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
170			},
171		}
172	}
173
174	/// Close the transport
175	pub async fn close(&self) -> anyhow::Result<()> {
176		match self {
177			Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
178			Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
179			Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
180		}
181	}
182
183	/// Check if the transport is connected
184	pub fn is_connected(&self) -> bool {
185		match self {
186			Self::gRPC(transport) => transport.is_connected(),
187			Self::IPC(transport) => transport.is_connected(),
188			Self::WASM(transport) => transport.is_connected(),
189		}
190	}
191
192	/// Get gRPC transport reference (if applicable)
193	pub fn as_grpc(&self) -> Option<&GrpcTransport> {
194		match self {
195			Self::gRPC(transport) => Some(transport),
196			_ => None,
197		}
198	}
199
200	/// Get IPC transport reference (if applicable)
201	pub fn as_ipc(&self) -> Option<&IPCTransportImpl> {
202		match self {
203			Self::IPC(transport) => Some(transport),
204			_ => None,
205		}
206	}
207
208	/// Get WASM transport reference (if applicable)
209	pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
210		match self {
211			Self::WASM(transport) => Some(transport),
212			_ => None,
213		}
214	}
215}
216
217impl Default for Transport {
218	fn default() -> Self {
219		// Default to gRPC with localhost address
220		Self::gRPC(GrpcTransport::new("127.0.0.1:50050").unwrap_or_else(|_| {
221			GrpcTransport::new("0.0.0.0:50050").expect("Failed to create default gRPC transport")
222		}))
223	}
224}
225
226impl fmt::Display for Transport {
227	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
228}
229
230/// Transport message wrapper
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct TransportMessage {
233	/// Message type identifier
234	pub message_type:String,
235	/// Message ID for correlation
236	pub message_id:String,
237	/// Timestamp (Unix epoch)
238	pub timestamp:u64,
239	/// Message payload
240	pub payload:Bytes,
241	/// Optional metadata
242	pub metadata:Option<serde_json::Value>,
243}
244
245impl TransportMessage {
246	/// Create a new transport message
247	pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
248		Self {
249			message_type:message_type.into(),
250			message_id:uuid::Uuid::new_v4().to_string(),
251			timestamp:std::time::SystemTime::now()
252				.duration_since(std::time::UNIX_EPOCH)
253				.map(|d| d.as_secs())
254				.unwrap_or(0),
255			payload,
256			metadata:None,
257		}
258	}
259
260	/// Set metadata for the message
261	pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
262		self.metadata = Some(metadata);
263		self
264	}
265
266	/// Serialize the message to bytes
267	pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
268		serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
269	}
270
271	/// Deserialize message from bytes
272	pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
273		serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
274	}
275}
276
277/// Transport statistics
278#[derive(Debug, Clone, Default, Serialize, Deserialize)]
279pub struct TransportStats {
280	/// Number of messages sent
281	pub messages_sent:u64,
282	/// Number of messages received
283	pub messages_received:u64,
284	/// Number of errors encountered
285	pub errors:u64,
286	/// Total bytes sent
287	pub bytes_sent:u64,
288	/// Total bytes received
289	pub bytes_received:u64,
290	/// Average latency in microseconds
291	pub avg_latency_us:u64,
292	/// Connection uptime in seconds
293	pub uptime_seconds:u64,
294}
295
296impl TransportStats {
297	/// Update statistics with a sent message
298	pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
299		self.messages_sent += 1;
300		self.bytes_sent += bytes;
301
302		// Update average latency
303		if self.messages_sent > 0 {
304			self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
305		}
306	}
307
308	/// Update statistics with a received message
309	pub fn record_received(&mut self, bytes:u64) {
310		self.messages_received += 1;
311		self.bytes_received += bytes;
312	}
313
314	/// Record an error
315	pub fn record_error(&mut self) { self.errors += 1; }
316}
317
318#[cfg(test)]
319mod tests {
320	use super::*;
321
322	#[test]
323	fn test_transport_type_to_string() {
324		assert_eq!(TransportType::gRPC.to_string(), "grpc");
325		assert_eq!(TransportType::IPC.to_string(), "ipc");
326		assert_eq!(TransportType::WASM.to_string(), "wasm");
327	}
328
329	#[test]
330	fn test_transport_type_from_str() {
331		assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
332		assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
333		assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
334		assert!("unknown".parse::<TransportType>().is_err());
335	}
336
337	#[test]
338	fn test_transport_display() {
339		// Create a dummy transport to test Display implementation
340		// In real tests, we'd use an actual transport
341		let transport = Transport::default();
342		let display = format!("{}", transport);
343		assert!(display.contains("Transport"));
344	}
345
346	#[test]
347	fn test_transport_message_creation() {
348		let message = TransportMessage::new("test_type", Bytes::from("hello"));
349		assert_eq!(message.message_type, "test_type");
350		assert_eq!(message.payload, Bytes::from("hello"));
351		assert!(!message.message_id.is_empty());
352	}
353
354	#[test]
355	fn test_transport_message_serialization() {
356		let message = TransportMessage::new("test", Bytes::from("data"));
357		let bytes = message.to_bytes().unwrap();
358		let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
359		assert_eq!(deserialized.message_type, message.message_type);
360		assert_eq!(deserialized.payload, message.payload);
361	}
362
363	#[test]
364	fn test_transport_stats() {
365		let mut stats = TransportStats::default();
366		stats.record_sent(100, 1000);
367		stats.record_received(50);
368		stats.record_error();
369
370		assert_eq!(stats.messages_sent, 1);
371		assert_eq!(stats.messages_received, 1);
372		assert_eq!(stats.errors, 1);
373		assert_eq!(stats.bytes_sent, 100);
374		assert_eq!(stats.bytes_received, 50);
375		assert_eq!(stats.avg_latency_us, 1000);
376	}
377}