scheduler.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. use crate::{
  2. executor::OutputHandle,
  3. ready_queue::{cpu_rq, local_rq, ReadyQueue},
  4. task::{Task, TaskAdapter, TaskHandle, TaskState},
  5. };
  6. use alloc::sync::Arc;
  7. use core::{
  8. ops::{Deref, DerefMut},
  9. ptr::NonNull,
  10. sync::atomic::Ordering,
  11. task::Waker,
  12. };
  13. use eonix_hal::processor::halt;
  14. use eonix_sync::{LazyLock, Spin, SpinIrq as _};
  15. use intrusive_collections::RBTree;
  16. use pointers::BorrowedArc;
  17. #[eonix_percpu::define_percpu]
  18. static CURRENT_TASK: Option<NonNull<Task>> = None;
  19. static TASKS: LazyLock<Spin<RBTree<TaskAdapter>>> =
  20. LazyLock::new(|| Spin::new(RBTree::new(TaskAdapter::new())));
  21. pub static RUNTIME: Runtime = Runtime();
  22. pub struct Runtime();
  23. pub struct JoinHandle<Output>(Arc<Spin<OutputHandle<Output>>>)
  24. where
  25. Output: Send;
  26. impl Task {
  27. pub fn current<'a>() -> BorrowedArc<'a, Task> {
  28. unsafe {
  29. // SAFETY:
  30. // We should never "inspect" a change in `current`.
  31. // The change of `CURRENT` will only happen in the scheduler. And if we are preempted,
  32. // when we DO return, the `CURRENT` will be the same and remain valid.
  33. BorrowedArc::from_raw(CURRENT_TASK.get().expect("Current task should be present"))
  34. }
  35. }
  36. }
  37. impl<O> JoinHandle<O>
  38. where
  39. O: Send,
  40. {
  41. pub fn join(self) -> O {
  42. let Self(output) = self;
  43. let mut waker = Some(Waker::from(Task::current().clone()));
  44. loop {
  45. let mut locked = output.lock();
  46. match locked.try_resolve() {
  47. Some(output) => break output,
  48. None => {
  49. if let Some(waker) = waker.take() {
  50. locked.register_waiter(waker);
  51. }
  52. }
  53. }
  54. }
  55. }
  56. }
  57. impl Runtime {
  58. fn select_cpu_for_task(&self, task: &Task) -> usize {
  59. task.cpu.load(Ordering::Relaxed) as _
  60. }
  61. pub fn activate(&self, task: &Arc<Task>) {
  62. // Only one cpu can be activating the task at a time.
  63. // TODO: Add some checks.
  64. if task.on_rq.swap(true, Ordering::Acquire) {
  65. // Lock the rq and check whether the task is on the rq again.
  66. let cpuid = task.cpu.load(Ordering::Acquire);
  67. let mut rq = cpu_rq(cpuid as _).lock_irq();
  68. if !task.on_rq.load(Ordering::Acquire) {
  69. // Task has just got off the rq. Put it back.
  70. rq.put(task.clone());
  71. } else {
  72. // Task is already on the rq. Do nothing.
  73. return;
  74. }
  75. } else {
  76. // Task not on some rq. Select one and put it here.
  77. let cpu = self.select_cpu_for_task(&task);
  78. let mut rq = cpu_rq(cpu).lock_irq();
  79. task.cpu.store(cpu as _, Ordering::Release);
  80. rq.put(task.clone());
  81. }
  82. }
  83. pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
  84. where
  85. F: Future + Send + 'static,
  86. F::Output: Send + 'static,
  87. {
  88. let TaskHandle {
  89. task,
  90. output_handle,
  91. } = Task::new(future);
  92. self.add_task(task.clone());
  93. self.activate(&task);
  94. JoinHandle(output_handle)
  95. }
  96. // /// Go to idle task. Call this with `preempt_count == 1`.
  97. // /// The preempt count will be decremented by this function.
  98. // ///
  99. // /// # Safety
  100. // /// We might never return from here.
  101. // /// Drop all variables that take ownership of some resource before calling this function.
  102. // pub fn schedule() {
  103. // assert_preempt_count_eq!(1, "Scheduler::schedule");
  104. // // Make sure all works are done before scheduling.
  105. // compiler_fence(Ordering::SeqCst);
  106. // // TODO!!!!!: Use of reference here needs further consideration.
  107. // //
  108. // // Since we might never return to here, we can't take ownership of `current()`.
  109. // // Is it safe to believe that `current()` will never change across calls?
  110. // unsafe {
  111. // // SAFETY: Preemption is disabled.
  112. // Scheduler::goto_scheduler(&Task::current().execution_context);
  113. // }
  114. // eonix_preempt::enable();
  115. // }
  116. }
  117. impl Runtime {
  118. fn add_task(&self, task: Arc<Task>) {
  119. TASKS.lock_irq().insert(task);
  120. }
  121. fn remove_task(&self, task: &impl Deref<Target = Arc<Task>>) {
  122. unsafe {
  123. TASKS
  124. .lock_irq()
  125. .cursor_mut_from_ptr(Arc::as_ptr(task))
  126. .remove();
  127. }
  128. }
  129. fn current(&self) -> Option<BorrowedArc<Task>> {
  130. CURRENT_TASK
  131. .get()
  132. .map(|ptr| unsafe { BorrowedArc::from_raw(ptr) })
  133. }
  134. fn remove_and_enqueue_current(&self, rq: &mut impl DerefMut<Target = dyn ReadyQueue>) {
  135. let Some(current) = self.current() else {
  136. return;
  137. };
  138. match current.state.cmpxchg(TaskState::RUNNING, TaskState::READY) {
  139. Ok(_) => {
  140. let current = unsafe {
  141. Arc::from_raw(
  142. CURRENT_TASK
  143. .get()
  144. .expect("Current task should be present")
  145. .as_ptr(),
  146. )
  147. };
  148. rq.put(current);
  149. }
  150. Err(old) => {
  151. assert_eq!(
  152. old,
  153. TaskState::PARKED,
  154. "Current task should be in PARKED state"
  155. );
  156. }
  157. }
  158. }
  159. /// Enter the runtime with an "init" future and run till its completion.
  160. ///
  161. /// The "init" future has the highest priority and when it completes,
  162. /// the runtime will exit immediately and yield its output.
  163. pub fn enter(&self) {
  164. loop {
  165. let mut rq = local_rq().lock_irq();
  166. self.remove_and_enqueue_current(&mut rq);
  167. let Some(next) = rq.get() else {
  168. drop(rq);
  169. halt();
  170. continue;
  171. };
  172. let old_state = next.state.swap(TaskState::RUNNING);
  173. assert_eq!(
  174. old_state,
  175. TaskState::READY,
  176. "Next task should be in READY state"
  177. );
  178. CURRENT_TASK.set(NonNull::new(Arc::into_raw(next) as *mut _));
  179. drop(rq);
  180. // TODO: MAYBE we can move the release of finished tasks to some worker thread.
  181. if Task::current().run() {
  182. Task::current().state.set(TaskState::DEAD);
  183. CURRENT_TASK.set(None);
  184. self.remove_task(&Task::current());
  185. }
  186. }
  187. }
  188. }