Echo/Scheduler/
Scheduler.rs1#![allow(non_snake_case, non_camel_case_types)]
7
8use std::{
9 collections::HashMap,
10 future::Future,
11 sync::{
12 Arc,
13 atomic::{AtomicBool, Ordering},
14 },
15};
16
17use log::{error, info, warn};
18use tokio::task::JoinHandle;
19
20use super::{SchedulerBuilder::Concurrency, Worker::Worker};
21use crate::{
22 Queue::StealingQueue::StealingQueue,
23 Task::{Priority::Priority, Task::Task},
24};
25
26pub struct Scheduler {
29 Queue:StealingQueue<Task>,
31
32 WorkerHandles:Vec<JoinHandle<()>>,
34
35 IsRunning:Arc<AtomicBool>,
37}
38
39impl Scheduler {
40 pub(crate) fn Create(WorkerCount:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
45 info!("[Scheduler] Creating scheduler with {} workers.", WorkerCount);
46
47 let IsRunning = Arc::new(AtomicBool::new(true));
48
49 let (Queue, Contexts) = StealingQueue::<Task>::Create(WorkerCount);
51
52 let mut WorkerHandles = Vec::with_capacity(WorkerCount);
53
54 for Context in Contexts.into_iter() {
56 let IsRunning = IsRunning.clone();
57
58 let WorkerHandle = tokio::spawn(async move {
59 Worker::Create(Context, IsRunning).Run().await;
61 });
62
63 WorkerHandles.push(WorkerHandle);
64 }
65
66 Self { Queue, WorkerHandles, IsRunning }
67 }
68
69 pub fn Submit<F>(&self, Operation:F, Priority:Priority)
74 where
75 F: Future<Output = ()> + Send + 'static, {
76 self.Queue.Submit(Task::Create(Operation, Priority));
77 }
78
79 pub async fn Stop(&mut self) {
84 if !self.IsRunning.swap(false, Ordering::Relaxed) {
85 info!("[Scheduler] Stop already initiated.");
86
87 return;
88 }
89
90 info!("[Scheduler] Stopping worker threads...");
91
92 for Handle in self.WorkerHandles.drain(..) {
93 if let Err(Error) = Handle.await {
94 error!("[Scheduler] Error joining worker handle: {}", Error);
95 }
96 }
97
98 info!("[Scheduler] All workers stopped successfully.");
99 }
100}
101
102impl Drop for Scheduler {
103 fn drop(&mut self) {
109 if self.IsRunning.load(Ordering::Relaxed) {
110 warn!("[Scheduler] Dropped without explicit stop. Signaling workers to terminate.");
111
112 self.IsRunning.store(false, Ordering::Relaxed);
113 }
114 }
115}