Echo/Scheduler/
Worker.rs

1//! # Worker
2//!
3//! Represents a single execution thread in the scheduler's thread pool.
4
5use std::sync::{
6	Arc,
7	atomic::{AtomicBool, Ordering},
8};
9
10use log::trace;
11use tokio::time::{Duration, sleep};
12
13use crate::{Queue::StealingQueue::Context, Task::Task::Task};
14
15/// Represents a worker that executes tasks from its assigned `Context`.
16pub struct Worker {
17	/// The worker's execution context, which contains its private deques and a
18	/// reference to the shared queue system.
19	Context:Context<Task>,
20
21	/// An atomic flag, shared by all workers, to signal a shutdown request.
22	IsRunning:Arc<AtomicBool>,
23}
24
25impl Worker {
26	/// Creates a new `Worker` with its unique execution context and a reference
27	/// to the scheduler's running state.
28	pub fn Create(Context:Context<Task>, IsRunning:Arc<AtomicBool>) -> Self { Self { Context, IsRunning } }
29
30	/// The main execution loop for the worker.
31	///
32	/// This loop continuously tries to find and execute tasks. It prioritizes
33	/// its local queue and, if empty, attempts to steal work from other
34	/// workers or the global queue. If no work is found anywhere, it yields
35	/// briefly to avoid busy-waiting.
36	pub async fn Run(self) {
37		trace!("[Worker {}] Starting run loop.", self.Context.Identifier);
38
39		while self.IsRunning.load(Ordering::Relaxed) {
40			// First, try to get a task from the local deques.
41			let TaskOption = self.PopLocal();
42
43			if let Some(Task) = TaskOption {
44				trace!(
45					"[Worker {}] Executing local task with priority: {:?}.",
46					self.Context.Identifier, Task.Priority
47				);
48
49				Task.Operation.await;
50
51				continue;
52			}
53
54			// If no local work, try to steal from the system.
55			let TaskOption = self.StealFromSystem();
56
57			if let Some(Task) = TaskOption {
58				trace!(
59					"[Worker {}] Executing stolen task with priority: {:?}.",
60					self.Context.Identifier, Task.Priority
61				);
62
63				Task.Operation.await;
64			} else {
65				// If there's truly no work anywhere, yield to the OS.
66				sleep(Duration::from_millis(1)).await;
67			}
68		}
69
70		trace!("[Worker {}] Run loop finished.", self.Context.Identifier);
71	}
72
73	/// Attempts to pop a single task from the local deques, honoring priority
74	/// from high to low.
75	fn PopLocal(&self) -> Option<Task> {
76		self.Context
77			.Local
78			.0
79			.pop()
80			.or_else(|| self.Context.Local.1.pop())
81			.or_else(|| self.Context.Local.2.pop())
82	}
83
84	/// Attempts to steal a batch of work from the system.
85	///
86	/// It steals from the highest-priority queue that has work, populating its
87	/// own local queue and returning the first task immediately for execution.
88	fn StealFromSystem(&self) -> Option<Task> {
89		self.Context
90			.Steal(
91				&self.Context.Share.Injector.0,
92				&self.Context.Share.Stealer.0,
93				&self.Context.Local.0,
94			)
95			.or_else(|| {
96				self.Context.Steal(
97					&self.Context.Share.Injector.1,
98					&self.Context.Share.Stealer.1,
99					&self.Context.Local.1,
100				)
101			})
102			.or_else(|| {
103				self.Context.Steal(
104					&self.Context.Share.Injector.2,
105					&self.Context.Share.Stealer.2,
106					&self.Context.Local.2,
107				)
108			})
109	}
110}