Echo/Queue/
StealingQueue.rs

1//! # StealingQueue
2//!
3//! A generic, priority-aware, work-stealing queue implementation. This module
4//! is self-contained and can be used by any scheduler or application to manage
5//! and distribute tasks of any type that can be prioritized.
6
7use std::sync::Arc;
8
9use crossbeam_deque::{Injector, Steal, Stealer, Worker};
10use rand::RngExt;
11
12/// Defines a contract for types that can be prioritized by the queue.
13pub trait Prioritized {
14	/// The type of the priority value used by the implementor.
15	type Kind: PartialEq + Eq + Copy;
16
17	/// A method to retrieve the priority of the item.
18	fn Rank(&self) -> Self::Kind;
19}
20
21/// Defines the internal priority levels used by the generic queue. These map
22/// directly to the different deques managed by the system.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum Priority {
25	High,
26
27	Normal,
28
29	Low,
30}
31
32/// Holds the queue components that are safe to share across all threads.
33///
34/// This includes global injectors for submitting new tasks from any context and
35/// stealers for taking tasks from other workers' deques, organized by priority
36/// level.
37pub struct Share<TTask> {
38	/// Global, multi-producer queues for each priority level.
39	pub Injector:(Injector<TTask>, Injector<TTask>, Injector<TTask>),
40
41	/// Shared handles for stealing tasks from each worker's local queue.
42	pub Stealer:(Vec<Stealer<TTask>>, Vec<Stealer<TTask>>, Vec<Stealer<TTask>>),
43}
44
45/// A generic, priority-aware, work-stealing queue.
46///
47/// This is the public-facing entry point for submitting tasks. It is generic
48/// over any task type `TTask` that implements the `Prioritized` trait.
49pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
50	/// A shared, thread-safe pointer to the queue's shared components.
51	Share:Arc<Share<TTask>>,
52}
53
54/// Contains all necessary components for a single worker thread to operate.
55///
56/// This includes the thread-local `Worker` deques, which are not safe to share,
57
58/// making this `Context` object the sole owner of a worker's private queues.
59pub struct Context<TTask> {
60	/// A unique identifier for the worker, used to avoid self-stealing.
61	pub Identifier:usize,
62
63	/// Thread-local work queues for each priority level.
64	pub Local:(Worker<TTask>, Worker<TTask>, Worker<TTask>),
65
66	/// A reference to the shared components of the entire queue system.
67	pub Share:Arc<Share<TTask>>,
68}
69
70impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
71	/// Creates a complete work-stealing queue system.
72	///
73	/// This function initializes all the necessary queues, both shared and
74	/// thread-local, for a given number of workers.
75	///
76	/// # Returns
77	///
78	/// A tuple containing:
79	/// 1. The public-facing `StealingQueue` for submitting new tasks.
80	/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
81	pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
82		let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
83
84		let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
85
86		let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
87
88		// For each priority level, create a thread-local worker queue and its
89		// corresponding shared stealer.
90		let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
91			.map(|_| {
92				let Worker = Worker::new_fifo();
93
94				let Stealer = Worker.stealer();
95
96				High.push(Worker);
97
98				Stealer
99			})
100			.collect();
101
102		let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
103			.map(|_| {
104				let Worker = Worker::new_fifo();
105
106				let Stealer = Worker.stealer();
107
108				Normal.push(Worker);
109
110				Stealer
111			})
112			.collect();
113
114		let StealerLow:Vec<Stealer<TTask>> = (0..Count)
115			.map(|_| {
116				let Worker = Worker::new_fifo();
117
118				let Stealer = Worker.stealer();
119
120				Low.push(Worker);
121
122				Stealer
123			})
124			.collect();
125
126		// Bundle all shared components into an Arc for safe sharing.
127		let Share = Arc::new(Share {
128			Injector:(Injector::new(), Injector::new(), Injector::new()),
129
130			Stealer:(StealerHigh, StealerNormal, StealerLow),
131		});
132
133		// Create a unique context for each worker, giving it ownership of its
134		// local queues and a reference to the shared components.
135		let mut Contexts = Vec::with_capacity(Count);
136
137		for Identifier in 0..Count {
138			Contexts.push(Context {
139				Identifier,
140
141				Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
142
143				Share:Share.clone(),
144			});
145		}
146
147		let Queue = Self { Share };
148
149		(Queue, Contexts)
150	}
151
152	/// Submits a new task to the appropriate global queue based on its
153	/// priority. This method is thread-safe and can be called from any
154	/// context.
155	pub fn Submit(&self, Task:TTask) {
156		match Task.Rank() {
157			Priority::High => self.Share.Injector.0.push(Task),
158
159			Priority::Normal => self.Share.Injector.1.push(Task),
160
161			Priority::Low => self.Share.Injector.2.push(Task),
162		}
163	}
164}
165
166impl<TTask> Context<TTask> {
167	/// Finds the next available task for the worker to execute.
168	///
169	/// This method implements the complete work-finding logic:
170	/// 1. Check local deques (from high to low priority).
171	/// 2. If local deques are empty, attempt to steal from the system (from
172	///    high to low priority).
173	pub fn Next(&self) -> Option<TTask> {
174		self.Local
175			.0
176			.pop()
177			.or_else(|| self.Local.1.pop())
178			.or_else(|| self.Local.2.pop())
179			.or_else(|| self.Steal(&self.Share.Injector.0, &self.Share.Stealer.0, &self.Local.0))
180			.or_else(|| self.Steal(&self.Share.Injector.1, &self.Share.Stealer.1, &self.Local.1))
181			.or_else(|| self.Steal(&self.Share.Injector.2, &self.Share.Stealer.2, &self.Local.2))
182	}
183
184	/// Attempts to steal a task from a specific priority set.
185	///
186	/// It first tries to steal a batch from the global injector queue for that
187	/// priority. If that fails, it attempts to steal from a randomly chosen
188	/// peer worker to ensure fair distribution and avoid contention hotspots.
189	pub fn Steal<'a>(
190		&self,
191
192		Injector:&'a Injector<TTask>,
193
194		Stealers:&'a [Stealer<TTask>],
195
196		Local:&'a Worker<TTask>,
197	) -> Option<TTask> {
198		// First, try to steal a batch from the global injector.
199		// `steal_batch_and_pop` is efficient: it moves a batch into our local
200		// queue and returns one task immediately if successful.
201		if let Steal::Success(Task) = Injector.steal_batch_and_pop(Local) {
202			return Some(Task);
203		}
204
205		// If the global queue is empty, try stealing from peers.
206		let Count = Stealers.len();
207
208		if Count <= 1 {
209			// Cannot steal if there are no other workers.
210			return None;
211		}
212
213		// Allocation-free random iteration: pick a random starting point.
214		let mut Rng = rand::rng();
215
216		let Start = Rng.random_range(0..Count);
217
218		// Iterate through all peers starting from the random offset.
219		for i in 0..Count {
220			let Index = (Start + i) % Count;
221
222			// Don't steal from ourselves.
223			if Index == self.Identifier {
224				continue;
225			}
226
227			if let Steal::Success(Task) = Stealers[Index].steal_batch_and_pop(Local) {
228				return Some(Task);
229			}
230		}
231
232		None
233	}
234}