Echo/Scheduler/
Scheduler.rs1use std::{
7 collections::HashMap,
8 future::Future,
9 sync::{
10 Arc,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use log::{error, info, warn};
16use tokio::task::JoinHandle;
17
18use super::{SchedulerBuilder::Concurrency, Worker::Worker};
19use crate::{
20 Queue::StealingQueue::StealingQueue,
21 Task::{Priority::Priority, Task::Task},
22};
23
24pub struct Scheduler {
27 Queue:StealingQueue<Task>,
29
30 WorkerHandles:Vec<JoinHandle<()>>,
32
33 IsRunning:Arc<AtomicBool>,
35}
36
37impl Scheduler {
38 pub(crate) fn Create(WorkerCount:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
43 info!("[Scheduler] Creating scheduler with {} workers.", WorkerCount);
44
45 let IsRunning = Arc::new(AtomicBool::new(true));
46
47 let (Queue, Contexts) = StealingQueue::<Task>::Create(WorkerCount);
49
50 let mut WorkerHandles = Vec::with_capacity(WorkerCount);
51
52 for Context in Contexts.into_iter() {
54 let IsRunning = IsRunning.clone();
55
56 let WorkerHandle = tokio::spawn(async move {
57 Worker::Create(Context, IsRunning).Run().await;
59 });
60
61 WorkerHandles.push(WorkerHandle);
62 }
63
64 Self { Queue, WorkerHandles, IsRunning }
65 }
66
67 pub fn Submit<F>(&self, Operation:F, Priority:Priority)
72 where
73 F: Future<Output = ()> + Send + 'static, {
74 self.Queue.Submit(Task::Create(Operation, Priority));
75 }
76
77 pub async fn Stop(&mut self) {
82 if !self.IsRunning.swap(false, Ordering::Relaxed) {
83 info!("[Scheduler] Stop already initiated.");
84
85 return;
86 }
87
88 info!("[Scheduler] Stopping worker threads...");
89
90 for Handle in self.WorkerHandles.drain(..) {
91 if let Err(Error) = Handle.await {
92 error!("[Scheduler] Error joining worker handle: {}", Error);
93 }
94 }
95
96 info!("[Scheduler] All workers stopped successfully.");
97 }
98}
99
100impl Drop for Scheduler {
101 fn drop(&mut self) {
107 if self.IsRunning.load(Ordering::Relaxed) {
108 warn!("[Scheduler] Dropped without explicit stop. Signaling workers to terminate.");
109
110 self.IsRunning.store(false, Ordering::Relaxed);
111 }
112 }
113}