Mountain/RunTime/Execute/
Fn.rs

1//! # Fn (RunTime::Execute)
2//!
3//! ## RESPONSIBILITIES
4//!
5//! Core effect execution functions that bridge the declarative ActionEffect
6//! system with the Echo scheduler for high-performance task execution.
7//!
8//! ## ARCHITECTURAL ROLE
9//!
10//! The execution engine in Mountain's architecture that handles the "how"
11//! of effect execution, while ActionEffect defines the "what".
12//!
13//! ## KEY COMPONENTS
14//!
15//! - **Run**: Basic effect execution through Echo scheduler
16//! - **RunWithTimeout**: Timeout-based execution with cancellation
17//! - **RunWithRetry**: Retry mechanisms with exponential backoff
18//!
19//! ## ERROR HANDLING
20//!
21//! All errors are propagated through Result<T, E> with detailed context.
22//! Effect errors are converted to CommonError when appropriate.
23//! Timeouts return timeout-specific errors.
24//! Retry failures include attempt count and last error information.
25//!
26//! ## LOGGING
27//!
28//! Uses log crate with appropriate severity levels:
29//! - `info`: Effect submission and completion
30//! - `debug`: Detailed execution steps
31//! - `warn`: Retry attempts and recoverable errors
32//! - `error`: Failed operations and timeout occurrences
33//!
34//! ## PERFORMANCE CONSIDERATIONS
35//!
36//! - Uses oneshot channels for result collection (minimal overhead)
37//! - Tasks are submitted to Echo's work-stealing scheduler
38//! - Timeout uses tokio::time::timeout for efficient cancellation
39//! - Retry with exponential backoff prevents system overload
40//!
41//! ## TODO
42//!
43//! None
44
45use std::sync::Arc;
46
47use CommonLibrary::{
48	Effect::{ActionEffect::ActionEffect, ApplicationRunTime::ApplicationRunTime as ApplicationRunTimeTrait},
49	Environment::Requires::Requires,
50	Error::CommonError::CommonError,
51};
52use Echo::Task::Priority::Priority;
53use async_trait::async_trait;
54use log::{error, warn};
55
56use crate::RunTime::ApplicationRunTime::ApplicationRunTime;
57
58/// The core integration logic between `Common::ActionEffect` and
59/// `Echo::Scheduler`.
60#[async_trait]
61impl ApplicationRunTimeTrait for ApplicationRunTime {
62	async fn Run<TCapabilityProvider, TError, TOutput>(
63		&self,
64		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
65	) -> Result<TOutput, TError>
66	where
67		TCapabilityProvider: ?Sized + Send + Sync + 'static,
68		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
69			Requires<TCapabilityProvider>,
70		TError: From<CommonError> + Send + Sync + 'static,
71		TOutput: Send + Sync + 'static, {
72		let (ResultSender, ResultReceiver) = tokio::sync::oneshot::channel::<Result<TOutput, TError>>();
73
74		let CapabilityProvider:Arc<TCapabilityProvider> = self.Environment.Require();
75
76		let Task = async move {
77			let Result = Effect.Apply(CapabilityProvider).await;
78
79			if ResultSender.send(Result).is_err() {
80				error!("[ApplicationRunTime] Failed to send effect result; receiver was dropped.");
81			}
82		};
83
84		self.Scheduler.Submit(Task, Priority::Normal);
85
86		match ResultReceiver.await {
87			Ok(Result) => Result,
88
89			Err(_RecvError) => {
90				let Message = "Effect execution canceled; oneshot channel closed.".to_string();
91
92				error!("{}", Message);
93
94				Err(CommonError::IPCError { Description:Message }.into())
95			},
96		}
97	}
98}
99
100impl ApplicationRunTime {
101	/// Enhanced effect execution with timeout and recovery.
102	pub async fn RunWithTimeout<TCapabilityProvider, TError, TOutput>(
103		&self,
104		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
105		timeout:std::time::Duration,
106	) -> Result<TOutput, TError>
107	where
108		TCapabilityProvider: ?Sized + Send + Sync + 'static,
109		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
110			Requires<TCapabilityProvider>,
111		TError: From<CommonError> + Send + Sync + 'static,
112		TOutput: Send + Sync + 'static, {
113		tokio::time::timeout(timeout, ApplicationRunTimeTrait::Run(self, Effect))
114			.await
115			.map_err(|_| {
116				CommonError::Unknown { Description:format!("Effect execution timed out after {:?}", timeout) }.into()
117			})?
118	}
119
120	/// Execute effect with retry mechanism.
121	pub async fn RunWithRetry<TCapabilityProvider, TError, TOutput>(
122		&self,
123		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
124		max_retries:u32,
125		initial_delay:std::time::Duration,
126	) -> Result<TOutput, TError>
127	where
128		TCapabilityProvider: ?Sized + Send + Sync + 'static,
129		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
130			Requires<TCapabilityProvider>,
131		TError: From<CommonError> + Send + Sync + 'static + std::fmt::Display,
132		TOutput: Send + Sync + 'static, {
133		let mut retry_count = 0;
134		let mut current_delay = initial_delay;
135
136		while retry_count <= max_retries {
137			match ApplicationRunTimeTrait::Run(self, Effect.clone()).await {
138				Ok(result) => return Ok(result),
139				Err(error) => {
140					if retry_count == max_retries {
141						return Err(error);
142					}
143
144					retry_count += 1;
145					warn!(
146						"[ApplicationRunTime] Effect execution failed (attempt {}): {}. Retrying in {:?}...",
147						retry_count, error, current_delay
148					);
149
150					tokio::time::sleep(current_delay).await;
151
152					// Apply exponential backoff by doubling the delay after each failure
153					// to prevent overwhelming the system during recovery attempts.
154					current_delay *= 2;
155				},
156			}
157		}
158
159		Err(
160			CommonError::Unknown { Description:format!("Effect execution failed after {} retries", max_retries) }
161				.into(),
162		)
163	}
164}