Echo/Scheduler/
Scheduler.rs

1//! # Scheduler
2//!
3//! Manages the pool of workers and the task queue system, serving as the main
4//! public interface of the `Echo` library.
5
6use std::{
7	collections::HashMap,
8	future::Future,
9	sync::{
10		Arc,
11		atomic::{AtomicBool, Ordering},
12	},
13};
14
15use log::{error, info, warn};
16use tokio::task::JoinHandle;
17
18use super::{SchedulerBuilder::Concurrency, Worker::Worker};
19use crate::{
20	Queue::StealingQueue::StealingQueue,
21	Task::{Priority::Priority, Task::Task},
22};
23
24/// Manages a pool of worker threads and a work-stealing queue to execute
25/// tasks efficiently. This is the primary public-facing struct of the library.
26pub struct Scheduler {
27	/// The underlying work-stealing queue system used for task submission.
28	Queue:StealingQueue<Task>,
29
30	/// Handles to the spawned worker threads, used for graceful shutdown.
31	WorkerHandles:Vec<JoinHandle<()>>,
32
33	/// An atomic flag to signal all workers to shut down.
34	IsRunning:Arc<AtomicBool>,
35}
36
37impl Scheduler {
38	/// Creates and starts a new scheduler with a given configuration.
39	///
40	/// This is a crate-private function, intended to be called only by the
41	/// `SchedulerBuilder`'s `Build` method.
42	pub(crate) fn Create(WorkerCount:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
43		info!("[Scheduler] Creating scheduler with {} workers.", WorkerCount);
44
45		let IsRunning = Arc::new(AtomicBool::new(true));
46
47		// Create the entire queue system and retrieve the contexts for each worker.
48		let (Queue, Contexts) = StealingQueue::<Task>::Create(WorkerCount);
49
50		let mut WorkerHandles = Vec::with_capacity(WorkerCount);
51
52		// Spawn an asynchronous task for each worker.
53		for Context in Contexts.into_iter() {
54			let IsRunning = IsRunning.clone();
55
56			let WorkerHandle = tokio::spawn(async move {
57				// Each task creates and runs a worker, consuming its context.
58				Worker::Create(Context, IsRunning).Run().await;
59			});
60
61			WorkerHandles.push(WorkerHandle);
62		}
63
64		Self { Queue, WorkerHandles, IsRunning }
65	}
66
67	/// Submits a new task to the scheduler's global queue.
68	///
69	/// The task will be picked up by the next available worker according to its
70	/// priority and the work-stealing logic.
71	pub fn Submit<F>(&self, Operation:F, Priority:Priority)
72	where
73		F: Future<Output = ()> + Send + 'static, {
74		self.Queue.Submit(Task::Create(Operation, Priority));
75	}
76
77	/// Asynchronously shuts down the scheduler.
78	///
79	/// This method signals all worker threads to stop their loops and then
80	/// waits for each one to complete its current task and exit gracefully.
81	pub async fn Stop(&mut self) {
82		if !self.IsRunning.swap(false, Ordering::Relaxed) {
83			info!("[Scheduler] Stop already initiated.");
84
85			return;
86		}
87
88		info!("[Scheduler] Stopping worker threads...");
89
90		for Handle in self.WorkerHandles.drain(..) {
91			if let Err(Error) = Handle.await {
92				error!("[Scheduler] Error joining worker handle: {}", Error);
93			}
94		}
95
96		info!("[Scheduler] All workers stopped successfully.");
97	}
98}
99
100impl Drop for Scheduler {
101	/// Ensures workers are signaled to stop if the `Scheduler` is dropped
102	/// without an explicit call to `Stop`.
103	///
104	/// This prevents orphaned worker threads if the user forgets to manage the
105	/// scheduler's lifecycle properly.
106	fn drop(&mut self) {
107		if self.IsRunning.load(Ordering::Relaxed) {
108			warn!("[Scheduler] Dropped without explicit stop. Signaling workers to terminate.");
109
110			self.IsRunning.store(false, Ordering::Relaxed);
111		}
112	}
113}