scheduler.rs 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. use crate::{
  2. context::ExecutionContext,
  3. executor::{ExecuteStatus, OutputHandle, Stack},
  4. ready_queue::{cpu_rq, local_rq},
  5. run::{Contexted, Run},
  6. task::{Task, TaskAdapter, TaskHandle},
  7. };
  8. use alloc::sync::Arc;
  9. use core::{
  10. mem::forget,
  11. ptr::NonNull,
  12. sync::atomic::{compiler_fence, Ordering},
  13. task::Waker,
  14. };
  15. use eonix_hal::processor::halt;
  16. use eonix_log::println_trace;
  17. use eonix_preempt::assert_preempt_count_eq;
  18. use eonix_sync::{LazyLock, Spin, SpinIrq as _};
  19. use intrusive_collections::RBTree;
  20. use pointers::BorrowedArc;
  21. #[eonix_percpu::define_percpu]
  22. static CURRENT_TASK: Option<NonNull<Task>> = None;
  23. #[eonix_percpu::define_percpu]
  24. static LOCAL_SCHEDULER_CONTEXT: ExecutionContext = ExecutionContext::new();
  25. static TASKS: LazyLock<Spin<RBTree<TaskAdapter>>> =
  26. LazyLock::new(|| Spin::new(RBTree::new(TaskAdapter::new())));
  27. pub struct Scheduler;
  28. pub struct JoinHandle<Output>(Arc<Spin<OutputHandle<Output>>>)
  29. where
  30. Output: Send;
  31. impl Task {
  32. pub fn current<'a>() -> BorrowedArc<'a, Task> {
  33. unsafe {
  34. // SAFETY:
  35. // We should never "inspect" a change in `current`.
  36. // The change of `CURRENT` will only happen in the scheduler. And if we are preempted,
  37. // when we DO return, the `CURRENT` will be the same and remain valid.
  38. BorrowedArc::from_raw(CURRENT_TASK.get().expect("Current task should be present"))
  39. }
  40. }
  41. }
  42. impl<O> JoinHandle<O>
  43. where
  44. O: Send,
  45. {
  46. pub fn join(self) -> O {
  47. let Self(output) = self;
  48. let mut waker = Some(Waker::from(Task::current().clone()));
  49. loop {
  50. let mut locked = output.lock();
  51. match locked.try_resolve() {
  52. Some(output) => break output,
  53. None => {
  54. if let Some(waker) = waker.take() {
  55. locked.register_waiter(waker);
  56. }
  57. }
  58. }
  59. }
  60. }
  61. }
  62. impl Scheduler {
  63. /// `Scheduler` might be used in various places. Do not hold it for a long time.
  64. ///
  65. /// # Safety
  66. /// The locked returned by this function should be locked with `lock_irq` to prevent from
  67. /// rescheduling during access to the scheduler. Disabling preemption will do the same.
  68. ///
  69. /// Drop the lock before calling `schedule`.
  70. pub fn get() -> &'static Self {
  71. static GLOBAL_SCHEDULER: Scheduler = Scheduler;
  72. &GLOBAL_SCHEDULER
  73. }
  74. pub fn init_local_scheduler<S>()
  75. where
  76. S: Stack,
  77. {
  78. let stack = S::new();
  79. unsafe {
  80. eonix_preempt::disable();
  81. // SAFETY: Preemption is disabled.
  82. let context: &mut ExecutionContext = LOCAL_SCHEDULER_CONTEXT.as_mut();
  83. context.set_ip(local_scheduler as _);
  84. context.set_sp(stack.get_bottom().addr().get() as usize);
  85. context.set_interrupt(true);
  86. eonix_preempt::enable();
  87. }
  88. // We don't need to keep the stack around.
  89. forget(stack);
  90. }
  91. /// # Safety
  92. /// This function must not be called inside of the scheulder context.
  93. ///
  94. /// The caller must ensure that `preempt::count` == 1.
  95. pub unsafe fn go_from_scheduler(to: &ExecutionContext) {
  96. // SAFETY: Preemption is disabled.
  97. unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref() }.switch_to(to);
  98. }
  99. /// # Safety
  100. /// This function must not be called inside of the scheulder context.
  101. ///
  102. /// The caller must ensure that `preempt::count` == 1.
  103. pub unsafe fn goto_scheduler(from: &ExecutionContext) {
  104. // SAFETY: Preemption is disabled.
  105. from.switch_to(unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref() });
  106. }
  107. /// # Safety
  108. /// This function must not be called inside of the scheulder context.
  109. ///
  110. /// The caller must ensure that `preempt::count` == 1.
  111. pub unsafe fn goto_scheduler_noreturn() -> ! {
  112. // SAFETY: Preemption is disabled.
  113. unsafe { LOCAL_SCHEDULER_CONTEXT.as_ref().switch_noreturn() }
  114. }
  115. fn add_task(task: Arc<Task>) {
  116. TASKS.lock().insert(task);
  117. }
  118. fn remove_task(task: &Task) {
  119. unsafe { TASKS.lock().cursor_mut_from_ptr(task as *const _).remove() };
  120. }
  121. fn select_cpu_for_task(&self, task: &Task) -> usize {
  122. task.cpu.load(Ordering::Relaxed) as _
  123. }
  124. pub fn activate(&self, task: &Arc<Task>) {
  125. // Only one cpu can be activating the task at a time.
  126. // TODO: Add some checks.
  127. if task.on_rq.swap(true, Ordering::Acquire) {
  128. // Lock the rq and check whether the task is on the rq again.
  129. let cpuid = task.cpu.load(Ordering::Acquire);
  130. let mut rq = cpu_rq(cpuid as _).lock_irq();
  131. if !task.on_rq.load(Ordering::Acquire) {
  132. // Task has just got off the rq. Put it back.
  133. rq.put(task.clone());
  134. } else {
  135. // Task is already on the rq. Do nothing.
  136. return;
  137. }
  138. } else {
  139. // Task not on some rq. Select one and put it here.
  140. let cpu = self.select_cpu_for_task(&task);
  141. let mut rq = cpu_rq(cpu).lock_irq();
  142. task.cpu.store(cpu as _, Ordering::Release);
  143. rq.put(task.clone());
  144. }
  145. }
  146. pub fn spawn<S, R>(&self, runnable: R) -> JoinHandle<R::Output>
  147. where
  148. S: Stack + 'static,
  149. R: Run + Contexted + Send + 'static,
  150. R::Output: Send + 'static,
  151. {
  152. let TaskHandle {
  153. task,
  154. output_handle,
  155. } = Task::new::<S, _>(runnable);
  156. Self::add_task(task.clone());
  157. self.activate(&task);
  158. JoinHandle(output_handle)
  159. }
  160. /// Go to idle task. Call this with `preempt_count == 1`.
  161. /// The preempt count will be decremented by this function.
  162. ///
  163. /// # Safety
  164. /// We might never return from here.
  165. /// Drop all variables that take ownership of some resource before calling this function.
  166. pub fn schedule() {
  167. assert_preempt_count_eq!(1, "Scheduler::schedule");
  168. // Make sure all works are done before scheduling.
  169. compiler_fence(Ordering::SeqCst);
  170. // TODO!!!!!: Use of reference here needs further consideration.
  171. //
  172. // Since we might never return to here, we can't take ownership of `current()`.
  173. // Is it safe to believe that `current()` will never change across calls?
  174. unsafe {
  175. // SAFETY: Preemption is disabled.
  176. Scheduler::goto_scheduler(&Task::current().execution_context);
  177. }
  178. eonix_preempt::enable();
  179. }
  180. }
  181. extern "C" fn local_scheduler() -> ! {
  182. loop {
  183. assert_preempt_count_eq!(1, "Scheduler::idle_task");
  184. let mut rq = local_rq().lock_irq();
  185. let previous_task = CURRENT_TASK
  186. .get()
  187. .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) });
  188. let next_task = rq.get();
  189. match (previous_task, next_task) {
  190. (None, None) => {
  191. // Nothing to do, halt the cpu and rerun the loop.
  192. drop(rq);
  193. halt();
  194. continue;
  195. }
  196. (None, Some(next)) => {
  197. CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
  198. }
  199. (Some(previous), None) => {
  200. if previous.state.is_running() {
  201. // Previous thread is `Running`, return to the current running thread.
  202. println_trace!(
  203. "trace_scheduler",
  204. "Returning to task id({}) without doing context switch",
  205. previous.id
  206. );
  207. CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
  208. } else {
  209. // Nothing to do, halt the cpu and rerun the loop.
  210. CURRENT_TASK.set(NonNull::new(Arc::into_raw(previous) as *mut _));
  211. drop(rq);
  212. halt();
  213. continue;
  214. }
  215. }
  216. (Some(previous), Some(next)) => {
  217. println_trace!(
  218. "trace_scheduler",
  219. "Switching from task id({}) to task id({})",
  220. previous.id,
  221. next.id
  222. );
  223. debug_assert_ne!(previous.id, next.id, "Switching to the same task");
  224. if previous.state.is_running() || !previous.state.try_park() {
  225. rq.put(previous);
  226. } else {
  227. previous.on_rq.store(false, Ordering::Release);
  228. }
  229. CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
  230. }
  231. }
  232. drop(rq);
  233. // TODO: We can move the release of finished tasks to some worker thread.
  234. if let ExecuteStatus::Finished = Task::current().run() {
  235. let current = CURRENT_TASK
  236. .swap(None)
  237. .map(|ptr| unsafe { Arc::from_raw(ptr.as_ptr()) })
  238. .expect("Current task should be present");
  239. Scheduler::remove_task(&current);
  240. }
  241. }
  242. }