grove/Protocol/
SpineConnection.rs1use std::{collections::HashMap, sync::Arc};
38
39use anyhow::{Context, Result};
40use serde::{Deserialize, Serialize};
41use tokio::sync::RwLock;
42use tracing::{debug, info, instrument, warn};
43
44use crate::Protocol::{MessageType, ProtocolConfig};
45#[cfg(feature = "grove_echo")]
46use crate::vine::generated::vine::{
47 EchoAction,
48 EchoActionResponse,
49 echo_action_service_client::EchoActionServiceClient,
50};
51
52#[derive(Debug, Clone, Copy, PartialEq)]
54pub enum ConnectionState {
55Disconnected,
57Connecting,
59Connected,
61Error,
63}
64
65#[derive(Clone, Debug)]
67pub struct HeartbeatConfig {
68pub interval_seconds:u64,
70pub max_missed:u32,
72pub enabled:bool,
74}
75
76impl Default for HeartbeatConfig {
78 fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
79}
80
81#[derive(Clone, Debug, Default)]
83pub struct ConnectionMetrics {
84pub total_requests:u64,
86pub successful_requests:u64,
88pub failed_requests:u64,
90pub uptime_seconds:u64,
92pub reconnections:u64,
94}
95
96pub struct SpineConnectionImpl {
98config:Arc<RwLock<ProtocolConfig>>,
100state:Arc<RwLock<ConnectionState>>,
102
103#[cfg(feature = "grove_echo")]
104echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
106
107heartbeat_config:HeartbeatConfig,
109last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
111metrics:Arc<RwLock<ConnectionMetrics>>,
113}
114
115impl SpineConnectionImpl {
116#[instrument(skip(config))]
126pub fn new(config:ProtocolConfig) -> Self {
127 Self {
128 config:Arc::new(RwLock::new(config)),
129 state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
130
131 #[cfg(feature = "grove_echo")]
132 echo_client:None,
133
134 heartbeat_config:HeartbeatConfig::default(),
135 last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
136 metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
137 }
138 }
139
140 #[instrument(skip(self))]
142 pub async fn Connect(&mut self) -> Result<()> {
143 let guard = self.config.read().await;
144 let url = guard.mountain_endpoint.clone();
145 drop(guard);
146
147 info!("Connecting to Spine at: {}", url);
148 *self.state.write().await = ConnectionState::Connecting;
149 *self.state.write().await = ConnectionState::Connected;
150 *self.last_heartbeat.write().await = chrono::Utc::now();
151 info!("Successfully connected to Spine");
152 Ok(())
153 }
154
155 #[instrument(skip(self))]
157 pub async fn Disconnect(&mut self) -> Result<()> {
158 info!("Disconnecting from Spine");
159
160 #[cfg(feature = "grove_echo")]
161 {
162 self.echo_client = None;
163 }
164
165 *self.state.write().await = ConnectionState::Disconnected;
166 info!("Successfully disconnected from Spine");
167 Ok(())
168 }
169
170 pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
172
173 #[instrument(skip(self, payload))]
180 pub async fn SendRequest(&self, method:&str, payload:Vec<u8>) -> Result<Vec<u8>> {
181 if self.GetState().await != ConnectionState::Connected {
182 return Err(anyhow::anyhow!("Not connected to Spine"));
183 }
184
185 debug!("Sending request: {}", method);
186
187 let mut metrics = self.metrics.write().await;
188 metrics.total_requests += 1;
189 metrics.successful_requests += 1;
190 Ok(Vec::new())
191 }
192
193 pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
195
196 pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
198}
199
200#[cfg(feature = "grove_echo")]
201impl SpineConnectionImpl {
202 #[instrument(skip(self))]
203 pub async fn ConnectEchoClient(&mut self) -> Result<()> {
204 let guard = self.config.read().await;
205 let url = guard.mountain_endpoint.clone();
206 drop(guard);
207
208 info!("Connecting EchoAction client to: {}", url);
209
210 let channel = tonic::transport::Channel::from_shared(url)
211 .context("Invalid Mountain URL")?
212 .connect()
213 .await
214 .context("Failed to connect EchoAction client")?;
215
216 self.echo_client = Some(EchoActionServiceClient::new(channel));
217 info!("EchoAction client connected");
218 Ok(())
219 }
220
221 #[instrument(skip(self, action))]
222 pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
223 if self.GetState().await != ConnectionState::Connected {
224 return Err(anyhow::anyhow!("Not connected to Spine"));
225 }
226
227 let client = self
228 .echo_client
229 .as_ref()
230 .ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
231
232 let response = client
233 .send_echo_action(action)
234 .await
235 .context("Failed to send EchoAction")?
236 .into_inner();
237
238 if !response.success {
239 anyhow::bail!("EchoAction failed: {}", response.error);
240 }
241
242 Ok(response)
243 }
244
245 pub async fn SendRpcViaEcho(
246 &self,
247 method:&str,
248 payload:Vec<u8>,
249 metadata:HashMap<String, String>,
250 ) -> Result<Vec<u8>> {
251 let mut headers = metadata;
252 headers.insert("rpc_method".to_string(), method.to_string());
253
254 let action = EchoAction {
255 action_id:uuid::Uuid::new_v4().to_string(),
256 source:"grove".to_string(),
257 target:"mountain".to_string(),
258 action_type:"rpc".to_string(),
259 payload,
260 headers,
261 timestamp:chrono::Utc::now().timestamp(),
262 nested_actions:vec![],
263 };
264
265 let response = self.SendEchoAction(action).await?;
266 Ok(response.result)
267 }
268
269 pub async fn SendEventViaEcho(
270 &self,
271 event_name:&str,
272 payload:Vec<u8>,
273 metadata:HashMap<String, String>,
274 ) -> Result<()> {
275 let mut headers = metadata;
276 headers.insert("event_name".to_string(), event_name.to_string());
277
278 let action = EchoAction {
279 action_id:uuid::Uuid::new_v4().to_string(),
280 source:"grove".to_string(),
281 target:"mountain".to_string(),
282 action_type:"event".to_string(),
283 payload,
284 headers,
285 timestamp:chrono::Utc::now().timestamp(),
286 nested_actions:vec![],
287 };
288
289 self.SendEchoAction(action).await?;
290 Ok(())
291 }
292
293 pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 #[test]
301 fn test_connection_state() {
302 let state = ConnectionState::Connected;
303 assert_eq!(state, ConnectionState::Connected);
304 }
305
306 #[test]
307 fn test_heartbeat_config_default() {
308 let config = HeartbeatConfig::default();
309 assert_eq!(config.interval_seconds, 30);
310 assert!(config.enabled);
311 }
312
313 #[tokio::test]
314 async fn test_spine_connection_creation() {
315 let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
316 let connection = SpineConnectionImpl::new(config);
317 assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
318 }
319}