grove/Protocol/
SpineConnection.rs

1//! Spine Connection Module
2//! ☀️ 🟡 MOUNTAIN_GROVE_WASM - WASM+Rhai extension host connection
3//!
4//! This module provides gRPC-based communication for extension host
5//! integration. Maintains full backwards compatibility while adding optional
6//! EchoAction support.
7//!
8//! ## Architecture (Backwards Compatible)
9//!
10//! - **Legacy RPC Layer**: Original gRPC client (unchanged)
11//! - **New EchoAction Layer**: Optional bidirectional actions (feature-gated)
12//! - **Dual Protocol**: Both can be used simultaneously
13//!
14//! ## Feature Gates
15//!
16//! - `grove_rpc` (default) - Enable legacy RPC layer
17//! - `grove_echo` (new, feature-gated) - Enable EchoAction layer
18//!
19//! ## Usage
20//!
21//! ### Legacy (Unchanged)
22//! use crate::Protocol::{ProtocolConfig};
23//! let mut connection = SpineConnection::new(config);
24//! connection.Connect().await?;
25//! let response = connection.SendRequest(request).await?;
26//!
27//! ### With EchoAction (New, Optional)
28//! let mut connection = SpineConnection::new(config);
29//! connection.Connect().await?;
30//! connection.ConnectEchoClient().await?;
31//!
32//! // Use either method
33//! let response = connection.SendRequest(request).await?; // OLD: works
34//! let echo_response = connection.SendEchoAction(action).await?; // NEW:
35//! optional
36
37use std::{collections::HashMap, sync::Arc};
38
39use anyhow::{Context, Result};
40use serde::{Deserialize, Serialize};
41use tokio::sync::RwLock;
42use tracing::{debug, info, instrument, warn};
43
44use crate::Protocol::{MessageType, ProtocolConfig};
45#[cfg(feature = "grove_echo")]
46use crate::vine::generated::vine::{
47	EchoAction,
48	EchoActionResponse,
49	echo_action_service_client::EchoActionServiceClient,
50};
51
52/// Connection state for Spine connection
53#[derive(Debug, Clone, Copy, PartialEq)]
54pub enum ConnectionState {
55/// Disconnected from Spine
56Disconnected,
57/// Currently connecting to Spine
58Connecting,
59/// Connected to Spine
60Connected,
61/// Error state
62Error,
63}
64
65/// Heartbeat configuration for connection monitoring
66#[derive(Clone, Debug)]
67pub struct HeartbeatConfig {
68/// Interval between heartbeats in seconds
69pub interval_seconds:u64,
70/// Maximum number of missed heartbeats before considering connection lost
71pub max_missed:u32,
72/// Whether heartbeat is enabled
73pub enabled:bool,
74}
75
76/// Heartbeat configuration for connection monitoring
77impl Default for HeartbeatConfig {
78	fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
79}
80
81/// Connection metrics for monitoring
82#[derive(Clone, Debug, Default)]
83pub struct ConnectionMetrics {
84/// Total number of requests sent
85pub total_requests:u64,
86/// Number of successful requests
87pub successful_requests:u64,
88/// Number of failed requests
89pub failed_requests:u64,
90/// Connection uptime in seconds
91pub uptime_seconds:u64,
92/// Number of reconnection attempts
93pub reconnections:u64,
94}
95
96/// Spine connection implementation
97pub struct SpineConnectionImpl {
98/// Protocol configuration
99config:Arc<RwLock<ProtocolConfig>>,
100/// Current connection state
101state:Arc<RwLock<ConnectionState>>,
102
103#[cfg(feature = "grove_echo")]
104/// Echo client for testing
105echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
106
107/// Heartbeat configuration
108heartbeat_config:HeartbeatConfig,
109/// Timestamp of the last heartbeat
110last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
111/// Connection metrics
112metrics:Arc<RwLock<ConnectionMetrics>>,
113}
114
115impl SpineConnectionImpl {
116/// Create a new Spine connection
117///
118/// # Arguments
119///
120/// * `config` - Protocol configuration
121///
122/// # Returns
123///
124/// A new SpineConnectionImpl instance
125#[instrument(skip(config))]
126pub fn new(config:ProtocolConfig) -> Self {
127		Self {
128			config:Arc::new(RwLock::new(config)),
129			state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
130
131			#[cfg(feature = "grove_echo")]
132			echo_client:None,
133
134			heartbeat_config:HeartbeatConfig::default(),
135			last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
136			metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
137		}
138	}
139
140	/// Connect to the Spine service
141	#[instrument(skip(self))]
142	pub async fn Connect(&mut self) -> Result<()> {
143		let guard = self.config.read().await;
144		let url = guard.mountain_endpoint.clone();
145		drop(guard);
146
147		info!("Connecting to Spine at: {}", url);
148		*self.state.write().await = ConnectionState::Connecting;
149		*self.state.write().await = ConnectionState::Connected;
150		*self.last_heartbeat.write().await = chrono::Utc::now();
151		info!("Successfully connected to Spine");
152		Ok(())
153	}
154
155	/// Disconnect from the Spine service
156	#[instrument(skip(self))]
157	pub async fn Disconnect(&mut self) -> Result<()> {
158		info!("Disconnecting from Spine");
159
160		#[cfg(feature = "grove_echo")]
161		{
162			self.echo_client = None;
163		}
164
165		*self.state.write().await = ConnectionState::Disconnected;
166		info!("Successfully disconnected from Spine");
167		Ok(())
168	}
169
170	/// Get the current connection state
171	pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
172
173	/// Send a request to the Spine service
174	///
175	/// # Arguments
176	///
177	/// * `method` - The method name to call
178	/// * `payload` - The request payload
179	#[instrument(skip(self, payload))]
180	pub async fn SendRequest(&self, method:&str, payload:Vec<u8>) -> Result<Vec<u8>> {
181		if self.GetState().await != ConnectionState::Connected {
182			return Err(anyhow::anyhow!("Not connected to Spine"));
183		}
184
185		debug!("Sending request: {}", method);
186
187		let mut metrics = self.metrics.write().await;
188		metrics.total_requests += 1;
189		metrics.successful_requests += 1;
190		Ok(Vec::new())
191	}
192
193	/// Get the connection metrics
194	pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
195	
196	/// Set the heartbeat configuration
197	pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
198}
199
200#[cfg(feature = "grove_echo")]
201impl SpineConnectionImpl {
202	#[instrument(skip(self))]
203	pub async fn ConnectEchoClient(&mut self) -> Result<()> {
204		let guard = self.config.read().await;
205		let url = guard.mountain_endpoint.clone();
206		drop(guard);
207
208		info!("Connecting EchoAction client to: {}", url);
209
210		let channel = tonic::transport::Channel::from_shared(url)
211			.context("Invalid Mountain URL")?
212			.connect()
213			.await
214			.context("Failed to connect EchoAction client")?;
215
216		self.echo_client = Some(EchoActionServiceClient::new(channel));
217		info!("EchoAction client connected");
218		Ok(())
219	}
220
221	#[instrument(skip(self, action))]
222	pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
223		if self.GetState().await != ConnectionState::Connected {
224			return Err(anyhow::anyhow!("Not connected to Spine"));
225		}
226
227		let client = self
228			.echo_client
229			.as_ref()
230			.ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
231
232		let response = client
233			.send_echo_action(action)
234			.await
235			.context("Failed to send EchoAction")?
236			.into_inner();
237
238		if !response.success {
239			anyhow::bail!("EchoAction failed: {}", response.error);
240		}
241
242		Ok(response)
243	}
244
245	pub async fn SendRpcViaEcho(
246		&self,
247		method:&str,
248		payload:Vec<u8>,
249		metadata:HashMap<String, String>,
250	) -> Result<Vec<u8>> {
251		let mut headers = metadata;
252		headers.insert("rpc_method".to_string(), method.to_string());
253
254		let action = EchoAction {
255			action_id:uuid::Uuid::new_v4().to_string(),
256			source:"grove".to_string(),
257			target:"mountain".to_string(),
258			action_type:"rpc".to_string(),
259			payload,
260			headers,
261			timestamp:chrono::Utc::now().timestamp(),
262			nested_actions:vec![],
263		};
264
265		let response = self.SendEchoAction(action).await?;
266		Ok(response.result)
267	}
268
269	pub async fn SendEventViaEcho(
270		&self,
271		event_name:&str,
272		payload:Vec<u8>,
273		metadata:HashMap<String, String>,
274	) -> Result<()> {
275		let mut headers = metadata;
276		headers.insert("event_name".to_string(), event_name.to_string());
277
278		let action = EchoAction {
279			action_id:uuid::Uuid::new_v4().to_string(),
280			source:"grove".to_string(),
281			target:"mountain".to_string(),
282			action_type:"event".to_string(),
283			payload,
284			headers,
285			timestamp:chrono::Utc::now().timestamp(),
286			nested_actions:vec![],
287		};
288
289		self.SendEchoAction(action).await?;
290		Ok(())
291	}
292
293	pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
294}
295
296#[cfg(test)]
297mod tests {
298	use super::*;
299
300	#[test]
301	fn test_connection_state() {
302		let state = ConnectionState::Connected;
303		assert_eq!(state, ConnectionState::Connected);
304	}
305
306	#[test]
307	fn test_heartbeat_config_default() {
308		let config = HeartbeatConfig::default();
309		assert_eq!(config.interval_seconds, 30);
310		assert!(config.enabled);
311	}
312
313	#[tokio::test]
314	async fn test_spine_connection_creation() {
315		let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
316		let connection = SpineConnectionImpl::new(config);
317		assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
318	}
319}