1use std::{
51 collections::{HashMap, hash_map::DefaultHasher},
52 sync::Arc,
53 time::{Duration, Instant},
54};
55
56use lazy_static::lazy_static;
57use log::{debug, error, info, warn};
58use parking_lot::Mutex;
59use serde_json::{Value, from_slice, to_vec};
60use tokio::time::timeout;
61
62use super::{
63 Error::VineError,
64 Generated::{GenericNotification, GenericRequest, cocoon_service_client::CocoonServiceClient},
65};
66
67type CocoonClient = CocoonServiceClient<tonic::transport::Channel>;
69
70mod Config {
72 pub const DEFAULT_TIMEOUT_MS:u64 = 5000;
74
75 pub const MAX_RETRY_ATTEMPTS:usize = 3;
77
78 pub const RETRY_BASE_DELAY_MS:u64 = 100;
80
81 pub const MAX_MESSAGE_SIZE_BYTES:usize = 4 * 1024 * 1024;
83
84 pub const HEALTH_CHECK_INTERVAL_MS:u64 = 30000;
86
87 pub const CONNECTION_TIMEOUT_MS:u64 = 10000;
89}
90
91struct ConnectionMetadata {
93 LastActivity:Instant,
95 FailureCount:usize,
97 IsHealthy:bool,
99}
100
101lazy_static! {
102 static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
104
105 static ref CONNECTION_METADATA: Arc<Mutex<HashMap<String, ConnectionMetadata>>> = Arc::new(Mutex::new(HashMap::new()));
107}
108
109pub async fn ConnectToSideCar(SideCarIdentifier:String, Address:String) -> Result<(), VineError> {
132 info!("[VineClient] Connecting to sidecar '{}' at '{}'...", SideCarIdentifier, Address);
133
134 let endpoint = format!("http://{}", Address);
135
136 if endpoint.len() > 256 {
138 return Err(VineError::RPCError(format!("Invalid endpoint address: exceeds maximum length")));
139 }
140
141 let mut last_error = None;
143
144 for attempt in 1..=Config::MAX_RETRY_ATTEMPTS {
145 let result = try_connect_single(&SideCarIdentifier, &endpoint).await;
146
147 if result.is_ok() {
148 CONNECTION_METADATA.lock().insert(
150 SideCarIdentifier.clone(),
151 ConnectionMetadata { LastActivity:Instant::now(), FailureCount:0, IsHealthy:true },
152 );
153
154 info!("[VineClient] Successfully connected to sidecar '{}'", SideCarIdentifier);
155
156 return Ok(result?);
157 }
158
159 last_error = Some(result.unwrap_err());
161
162 if attempt < Config::MAX_RETRY_ATTEMPTS {
164 let delay_ms = Config::RETRY_BASE_DELAY_MS * 2_u64.pow(attempt as u32);
165 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
166 }
167 }
168
169 Err(last_error.unwrap_or_else(|| VineError::RPCError("Connection failed".to_string())))
170}
171
172async fn try_connect_single(_SideCarIdentifier:&str, endpoint:&str) -> Result<(), VineError> {
174 let endpoint_url = if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
175 endpoint.to_string()
176 } else {
177 format!("http://{}", endpoint)
178 };
179
180 let channel = tonic::transport::Channel::from_shared(endpoint_url)
181 .map_err(|e| VineError::RPCError(format!("Failed to create channel: {}", e)))?
182 .connect()
183 .await
184 .map_err(|e| VineError::RPCError(format!("Failed to connect: {}", e)))?;
185
186 let client = CocoonClient::new(channel);
187
188 let mut clients = SIDECAR_CLIENTS.lock();
189 clients.insert(_SideCarIdentifier.to_string(), client);
190
191 Ok(())
192}
193
194pub fn DisconnectFromSideCar(SideCarIdentifier:String) -> Result<(), VineError> {
215 let mut clients = SIDECAR_CLIENTS.lock();
216
217 if clients.remove(&SideCarIdentifier).is_some() {
218 CONNECTION_METADATA.lock().remove(&SideCarIdentifier);
219
220 info!("[VineClient] Disconnected from sidecar '{}'", SideCarIdentifier);
221
222 Ok(())
223 } else {
224 Err(VineError::ClientNotConnected(SideCarIdentifier))
225 }
226}
227
228pub fn CheckSideCarHealth(SideCarIdentifier:&str) -> Result<bool, VineError> {
252 let metadata = CONNECTION_METADATA.lock();
253
254 if let Some(conn) = metadata.get(SideCarIdentifier) {
255 let is_stale = conn.LastActivity.elapsed() > Duration::from_millis(Config::HEALTH_CHECK_INTERVAL_MS);
256 let has_many_failures = conn.FailureCount > Config::MAX_RETRY_ATTEMPTS;
257
258 Ok(conn.IsHealthy && !is_stale && !has_many_failures)
259 } else {
260 Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()))
261 }
262}
263
264fn RecordSideCarFailure(SideCarIdentifier:&str) {
271 let mut metadata = CONNECTION_METADATA.lock();
272
273 if let Some(conn) = metadata.get_mut(SideCarIdentifier) {
274 conn.FailureCount += 1;
275 conn.IsHealthy = false;
276 }
277}
278
279fn UpdateSideCarActivity(SideCarIdentifier:&str) {
286 let mut metadata = CONNECTION_METADATA.lock();
287
288 if let Some(conn) = metadata.get_mut(SideCarIdentifier) {
289 conn.LastActivity = Instant::now();
290 conn.FailureCount = 0;
291 conn.IsHealthy = true;
292 }
293}
294
295fn ValidateMessageSize(data:&[u8]) -> Result<(), VineError> {
306 if data.len() > Config::MAX_MESSAGE_SIZE_BYTES {
307 Err(VineError::MessageTooLarge { ActualSize:data.len(), MaxSize:Config::MAX_MESSAGE_SIZE_BYTES })
308 } else {
309 Ok(())
310 }
311}
312
313pub async fn SendRequest(
340 SideCarIdentifier:&str,
341 Method:String,
342 Parameters:Value,
343 TimeoutMilliseconds:u64,
344) -> Result<Value, VineError> {
345 if Method.is_empty() || Method.len() > 128 {
347 return Err(VineError::RPCError(
348 "Method name must be between 1 and 128 characters".to_string(),
349 ));
350 }
351
352 let timeout_duration = Duration::from_millis(if TimeoutMilliseconds > 0 {
353 TimeoutMilliseconds
354 } else {
355 Config::DEFAULT_TIMEOUT_MS
356 });
357
358 let parameter_bytes =
360 to_vec(&Parameters).map_err(|e| VineError::RPCError(format!("Failed to serialize parameters: {}", e)))?;
361 ValidateMessageSize(¶meter_bytes)?;
362
363 let mut client = {
364 let guard = SIDECAR_CLIENTS.lock();
365 guard.get(SideCarIdentifier).cloned()
366 };
367
368 if client.is_none() {
369 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
370 }
371
372 let mut client = client.unwrap();
373
374 let request_identifier = std::time::SystemTime::now()
375 .duration_since(std::time::UNIX_EPOCH)
376 .unwrap()
377 .as_nanos() as u64;
378 let method_clone = Method.clone();
379 let request = GenericRequest { request_identifier, method:Method, parameter:parameter_bytes };
380
381 let result = timeout(timeout_duration, client.process_mountain_request(request)).await;
382
383 match result {
384 Ok(Ok(response)) => {
385 UpdateSideCarActivity(SideCarIdentifier);
386 debug!(
387 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
388 SideCarIdentifier, method_clone
389 );
390
391 let inner_response = response.into_inner();
393
394 let result_bytes = inner_response.result;
396 let result_value:Value = from_slice(&result_bytes)
397 .map_err(|e| VineError::RPCError(format!("Failed to deserialize response: {}", e)))?;
398
399 if let Some(error_data) = inner_response.error {
401 return Err(VineError::RPCError(format!(
402 "RPC error from sidecar: code={}, message={}",
403 error_data.code, error_data.message
404 )));
405 }
406
407 Ok(result_value)
408 },
409 Ok(Err(status)) => {
410 RecordSideCarFailure(SideCarIdentifier);
411 return Err(VineError::RPCError(format!("gRPC error: {}", status)));
412 },
413 Err(_) => {
414 RecordSideCarFailure(SideCarIdentifier);
415 Err(VineError::RequestTimeout {
416 SideCarIdentifier:SideCarIdentifier.to_string(),
417 MethodName:method_clone,
418 TimeoutMilliseconds:timeout_duration.as_millis() as u64,
419 })
420 },
421 }
422}
423
424pub async fn SendNotification(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
442 if Method.is_empty() || Method.len() > 128 {
444 return Err(VineError::RPCError(
445 "Method name must be between 1 and 128 characters".to_string(),
446 ));
447 }
448
449 let parameter_bytes = to_vec(&Parameters)?;
450 ValidateMessageSize(¶meter_bytes)?;
451
452 let mut client = {
453 let guard = SIDECAR_CLIENTS.lock();
454 guard.get(&SideCarIdentifier).cloned()
455 };
456
457 if let Some(ref mut client) = client {
458 let request = GenericNotification { method:Method, parameter:parameter_bytes };
459
460 match client.send_mountain_notification(request).await {
461 Ok(_) => {
462 UpdateSideCarActivity(&SideCarIdentifier);
463 debug!("[VineClient] Notification sent successfully to sidecar '{}'", SideCarIdentifier);
464 Ok(())
465 },
466 Err(status) => {
467 RecordSideCarFailure(&SideCarIdentifier);
468 error!(
469 "[VineClient] Failed to send notification to sidecar '{}': {}",
470 SideCarIdentifier, status
471 );
472 Err(VineError::from(status))
473 },
474 }
475 } else {
476 Err(VineError::ClientNotConnected(SideCarIdentifier))
477 }
478}