1use std::sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8};
9
10use log::trace;
11use tokio::time::{Duration, sleep};
12
13use crate::{Queue::StealingQueue::Context, Task::Task::Task};
14
15pub struct Worker {
17 Context:Context<Task>,
20
21 IsRunning:Arc<AtomicBool>,
23}
24
25impl Worker {
26 pub fn Create(Context:Context<Task>, IsRunning:Arc<AtomicBool>) -> Self { Self { Context, IsRunning } }
29
30 pub async fn Run(self) {
37 trace!("[Worker {}] Starting run loop.", self.Context.Identifier);
38
39 while self.IsRunning.load(Ordering::Relaxed) {
40 let TaskOption = self.PopLocal();
42
43 if let Some(Task) = TaskOption {
44 trace!(
45 "[Worker {}] Executing local task with priority: {:?}.",
46 self.Context.Identifier, Task.Priority
47 );
48
49 Task.Operation.await;
50
51 continue;
52 }
53
54 let TaskOption = self.StealFromSystem();
56
57 if let Some(Task) = TaskOption {
58 trace!(
59 "[Worker {}] Executing stolen task with priority: {:?}.",
60 self.Context.Identifier, Task.Priority
61 );
62
63 Task.Operation.await;
64 } else {
65 sleep(Duration::from_millis(1)).await;
67 }
68 }
69
70 trace!("[Worker {}] Run loop finished.", self.Context.Identifier);
71 }
72
73 fn PopLocal(&self) -> Option<Task> {
76 self.Context
77 .Local
78 .0
79 .pop()
80 .or_else(|| self.Context.Local.1.pop())
81 .or_else(|| self.Context.Local.2.pop())
82 }
83
84 fn StealFromSystem(&self) -> Option<Task> {
89 self.Context
90 .Steal(
91 &self.Context.Share.Injector.0,
92 &self.Context.Share.Stealer.0,
93 &self.Context.Local.0,
94 )
95 .or_else(|| {
96 self.Context.Steal(
97 &self.Context.Share.Injector.1,
98 &self.Context.Share.Stealer.1,
99 &self.Context.Local.1,
100 )
101 })
102 .or_else(|| {
103 self.Context.Steal(
104 &self.Context.Share.Injector.2,
105 &self.Context.Share.Stealer.2,
106 &self.Context.Local.2,
107 )
108 })
109 }
110}