Echo/Queue/StealingQueue.rs
1//! # StealingQueue
2//!
3//! A generic, priority-aware, work-stealing queue implementation. This module
4//! is self-contained and can be used by any scheduler or application to manage
5//! and distribute tasks of any type that can be prioritized.
6
7use std::sync::Arc;
8
9use crossbeam_deque::{Injector, Steal, Stealer, Worker};
10use rand::RngExt;
11
12/// Defines a contract for types that can be prioritized by the queue.
13pub trait Prioritized {
14 /// The type of the priority value used by the implementor.
15 type Kind: PartialEq + Eq + Copy;
16
17 /// A method to retrieve the priority of the item.
18 fn Rank(&self) -> Self::Kind;
19}
20
21/// Defines the internal priority levels used by the generic queue. These map
22/// directly to the different deques managed by the system.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum Priority {
25 High,
26
27 Normal,
28
29 Low,
30}
31
32/// Holds the queue components that are safe to share across all threads.
33///
34/// This includes global injectors for submitting new tasks from any context and
35/// stealers for taking tasks from other workers' deques, organized by priority
36/// level.
37pub struct Share<TTask> {
38 /// Global, multi-producer queues for each priority level.
39 pub Injector:(Injector<TTask>, Injector<TTask>, Injector<TTask>),
40
41 /// Shared handles for stealing tasks from each worker's local queue.
42 pub Stealer:(Vec<Stealer<TTask>>, Vec<Stealer<TTask>>, Vec<Stealer<TTask>>),
43}
44
45/// A generic, priority-aware, work-stealing queue.
46///
47/// This is the public-facing entry point for submitting tasks. It is generic
48/// over any task type `TTask` that implements the `Prioritized` trait.
49pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
50 /// A shared, thread-safe pointer to the queue's shared components.
51 Share:Arc<Share<TTask>>,
52}
53
54/// Contains all necessary components for a single worker thread to operate.
55///
56/// This includes the thread-local `Worker` deques, which are not safe to share,
57
58/// making this `Context` object the sole owner of a worker's private queues.
59pub struct Context<TTask> {
60 /// A unique identifier for the worker, used to avoid self-stealing.
61 pub Identifier:usize,
62
63 /// Thread-local work queues for each priority level.
64 pub Local:(Worker<TTask>, Worker<TTask>, Worker<TTask>),
65
66 /// A reference to the shared components of the entire queue system.
67 pub Share:Arc<Share<TTask>>,
68}
69
70impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
71 /// Creates a complete work-stealing queue system.
72 ///
73 /// This function initializes all the necessary queues, both shared and
74 /// thread-local, for a given number of workers.
75 ///
76 /// # Returns
77 ///
78 /// A tuple containing:
79 /// 1. The public-facing `StealingQueue` for submitting new tasks.
80 /// 2. A `Vec` of `Context` objects, one for each worker thread to own.
81 pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
82 let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
83
84 let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
85
86 let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
87
88 // For each priority level, create a thread-local worker queue and its
89 // corresponding shared stealer.
90 let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
91 .map(|_| {
92 let Worker = Worker::new_fifo();
93
94 let Stealer = Worker.stealer();
95
96 High.push(Worker);
97
98 Stealer
99 })
100 .collect();
101
102 let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
103 .map(|_| {
104 let Worker = Worker::new_fifo();
105
106 let Stealer = Worker.stealer();
107
108 Normal.push(Worker);
109
110 Stealer
111 })
112 .collect();
113
114 let StealerLow:Vec<Stealer<TTask>> = (0..Count)
115 .map(|_| {
116 let Worker = Worker::new_fifo();
117
118 let Stealer = Worker.stealer();
119
120 Low.push(Worker);
121
122 Stealer
123 })
124 .collect();
125
126 // Bundle all shared components into an Arc for safe sharing.
127 let Share = Arc::new(Share {
128 Injector:(Injector::new(), Injector::new(), Injector::new()),
129
130 Stealer:(StealerHigh, StealerNormal, StealerLow),
131 });
132
133 // Create a unique context for each worker, giving it ownership of its
134 // local queues and a reference to the shared components.
135 let mut Contexts = Vec::with_capacity(Count);
136
137 for Identifier in 0..Count {
138 Contexts.push(Context {
139 Identifier,
140
141 Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
142
143 Share:Share.clone(),
144 });
145 }
146
147 let Queue = Self { Share };
148
149 (Queue, Contexts)
150 }
151
152 /// Submits a new task to the appropriate global queue based on its
153 /// priority. This method is thread-safe and can be called from any
154 /// context.
155 pub fn Submit(&self, Task:TTask) {
156 match Task.Rank() {
157 Priority::High => self.Share.Injector.0.push(Task),
158
159 Priority::Normal => self.Share.Injector.1.push(Task),
160
161 Priority::Low => self.Share.Injector.2.push(Task),
162 }
163 }
164}
165
166impl<TTask> Context<TTask> {
167 /// Finds the next available task for the worker to execute.
168 ///
169 /// This method implements the complete work-finding logic:
170 /// 1. Check local deques (from high to low priority).
171 /// 2. If local deques are empty, attempt to steal from the system (from
172 /// high to low priority).
173 pub fn Next(&self) -> Option<TTask> {
174 self.Local
175 .0
176 .pop()
177 .or_else(|| self.Local.1.pop())
178 .or_else(|| self.Local.2.pop())
179 .or_else(|| self.Steal(&self.Share.Injector.0, &self.Share.Stealer.0, &self.Local.0))
180 .or_else(|| self.Steal(&self.Share.Injector.1, &self.Share.Stealer.1, &self.Local.1))
181 .or_else(|| self.Steal(&self.Share.Injector.2, &self.Share.Stealer.2, &self.Local.2))
182 }
183
184 /// Attempts to steal a task from a specific priority set.
185 ///
186 /// It first tries to steal a batch from the global injector queue for that
187 /// priority. If that fails, it attempts to steal from a randomly chosen
188 /// peer worker to ensure fair distribution and avoid contention hotspots.
189 pub fn Steal<'a>(
190 &self,
191
192 Injector:&'a Injector<TTask>,
193
194 Stealers:&'a [Stealer<TTask>],
195
196 Local:&'a Worker<TTask>,
197 ) -> Option<TTask> {
198 // First, try to steal a batch from the global injector.
199 // `steal_batch_and_pop` is efficient: it moves a batch into our local
200 // queue and returns one task immediately if successful.
201 if let Steal::Success(Task) = Injector.steal_batch_and_pop(Local) {
202 return Some(Task);
203 }
204
205 // If the global queue is empty, try stealing from peers.
206 let Count = Stealers.len();
207
208 if Count <= 1 {
209 // Cannot steal if there are no other workers.
210 return None;
211 }
212
213 // Allocation-free random iteration: pick a random starting point.
214 let mut Rng = rand::rng();
215
216 let Start = Rng.random_range(0..Count);
217
218 // Iterate through all peers starting from the random offset.
219 for i in 0..Count {
220 let Index = (Start + i) % Count;
221
222 // Don't steal from ourselves.
223 if Index == self.Identifier {
224 continue;
225 }
226
227 if let Steal::Success(Task) = Stealers[Index].steal_batch_and_pop(Local) {
228 return Some(Task);
229 }
230 }
231
232 None
233 }
234}