Mountain/RunTime/Execute/
Fn.rs1use std::sync::Arc;
46
47use CommonLibrary::{
48 Effect::{ActionEffect::ActionEffect, ApplicationRunTime::ApplicationRunTime as ApplicationRunTimeTrait},
49 Environment::Requires::Requires,
50 Error::CommonError::CommonError,
51};
52use Echo::Task::Priority::Priority;
53use async_trait::async_trait;
54use log::{error, warn};
55
56use crate::RunTime::ApplicationRunTime::ApplicationRunTime;
57
58#[async_trait]
61impl ApplicationRunTimeTrait for ApplicationRunTime {
62 async fn Run<TCapabilityProvider, TError, TOutput>(
63 &self,
64 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
65 ) -> Result<TOutput, TError>
66 where
67 TCapabilityProvider: ?Sized + Send + Sync + 'static,
68 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
69 Requires<TCapabilityProvider>,
70 TError: From<CommonError> + Send + Sync + 'static,
71 TOutput: Send + Sync + 'static, {
72 let (ResultSender, ResultReceiver) = tokio::sync::oneshot::channel::<Result<TOutput, TError>>();
73
74 let CapabilityProvider:Arc<TCapabilityProvider> = self.Environment.Require();
75
76 let Task = async move {
77 let Result = Effect.Apply(CapabilityProvider).await;
78
79 if ResultSender.send(Result).is_err() {
80 error!("[ApplicationRunTime] Failed to send effect result; receiver was dropped.");
81 }
82 };
83
84 self.Scheduler.Submit(Task, Priority::Normal);
85
86 match ResultReceiver.await {
87 Ok(Result) => Result,
88
89 Err(_RecvError) => {
90 let Message = "Effect execution canceled; oneshot channel closed.".to_string();
91
92 error!("{}", Message);
93
94 Err(CommonError::IPCError { Description:Message }.into())
95 },
96 }
97 }
98}
99
100impl ApplicationRunTime {
101 pub async fn RunWithTimeout<TCapabilityProvider, TError, TOutput>(
103 &self,
104 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
105 timeout:std::time::Duration,
106 ) -> Result<TOutput, TError>
107 where
108 TCapabilityProvider: ?Sized + Send + Sync + 'static,
109 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
110 Requires<TCapabilityProvider>,
111 TError: From<CommonError> + Send + Sync + 'static,
112 TOutput: Send + Sync + 'static, {
113 tokio::time::timeout(timeout, ApplicationRunTimeTrait::Run(self, Effect))
114 .await
115 .map_err(|_| {
116 CommonError::Unknown { Description:format!("Effect execution timed out after {:?}", timeout) }.into()
117 })?
118 }
119
120 pub async fn RunWithRetry<TCapabilityProvider, TError, TOutput>(
122 &self,
123 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
124 max_retries:u32,
125 initial_delay:std::time::Duration,
126 ) -> Result<TOutput, TError>
127 where
128 TCapabilityProvider: ?Sized + Send + Sync + 'static,
129 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
130 Requires<TCapabilityProvider>,
131 TError: From<CommonError> + Send + Sync + 'static + std::fmt::Display,
132 TOutput: Send + Sync + 'static, {
133 let mut retry_count = 0;
134 let mut current_delay = initial_delay;
135
136 while retry_count <= max_retries {
137 match ApplicationRunTimeTrait::Run(self, Effect.clone()).await {
138 Ok(result) => return Ok(result),
139 Err(error) => {
140 if retry_count == max_retries {
141 return Err(error);
142 }
143
144 retry_count += 1;
145 warn!(
146 "[ApplicationRunTime] Effect execution failed (attempt {}): {}. Retrying in {:?}...",
147 retry_count, error, current_delay
148 );
149
150 tokio::time::sleep(current_delay).await;
151
152 current_delay *= 2;
155 },
156 }
157 }
158
159 Err(
160 CommonError::Unknown { Description:format!("Effect execution failed after {} retries", max_retries) }
161 .into(),
162 )
163 }
164}