scheduler.rs 6.5 KB


  1. use crate::{
  2. executor::OutputHandle,
  3. ready_queue::{local_rq, ReadyQueue},
  4. task::{Task, TaskAdapter, TaskHandle, TaskState},
  5. };
  6. use alloc::{sync::Arc, task::Wake};
  7. use core::{
  8. ops::{Deref, DerefMut},
  9. ptr::NonNull,
  10. task::{Context, Poll, Waker},
  11. };
  12. use eonix_hal::processor::halt;
  13. use eonix_log::println_trace;
  14. use eonix_sync::{LazyLock, Spin, SpinIrq as _};
  15. use intrusive_collections::RBTree;
  16. use pointers::BorrowedArc;
  17. #[eonix_percpu::define_percpu]
  18. static CURRENT_TASK: Option<NonNull<Task>> = None;
  19. static TASKS: LazyLock<Spin<RBTree<TaskAdapter>>> =
  20. LazyLock::new(|| Spin::new(RBTree::new(TaskAdapter::new())));
  21. pub static RUNTIME: Runtime = Runtime();
  22. pub struct Runtime();
  23. pub struct JoinHandle<Output>(Arc<Spin<OutputHandle<Output>>>)
  24. where
  25. Output: Send;
  26. impl Task {
  27. pub fn current<'a>() -> BorrowedArc<'a, Task> {
  28. unsafe {
  29. // SAFETY:
  30. // We should never "inspect" a change in `current`.
  31. // The change of `CURRENT` will only happen in the scheduler. And if we are preempted,
  32. // when we DO return, the `CURRENT` will be the same and remain valid.
  33. BorrowedArc::from_raw(CURRENT_TASK.get().expect("Current task should be present"))
  34. }
  35. }
  36. }
  37. impl<O> JoinHandle<O>
  38. where
  39. O: Send,
  40. {
  41. pub fn join(self) -> O {
  42. let Self(output) = self;
  43. let mut waker = Some(Waker::from(Task::current().clone()));
  44. loop {
  45. let mut locked = output.lock();
  46. match locked.try_resolve() {
  47. Some(output) => break output,
  48. None => {
  49. if let Some(waker) = waker.take() {
  50. locked.register_waiter(waker);
  51. }
  52. }
  53. }
  54. }
  55. }
  56. }
  57. impl Runtime {
  58. pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
  59. where
  60. F: Future + Send + 'static,
  61. F::Output: Send + 'static,
  62. {
  63. let TaskHandle {
  64. task,
  65. output_handle,
  66. } = Task::new(future);
  67. self.add_task(task.clone());
  68. task.wake_by_ref();
  69. JoinHandle(output_handle)
  70. }
  71. fn add_task(&self, task: Arc<Task>) {
  72. TASKS.lock_irq().insert(task);
  73. }
  74. fn remove_task(&self, task: &impl Deref<Target = Arc<Task>>) {
  75. unsafe {
  76. TASKS
  77. .lock_irq()
  78. .cursor_mut_from_ptr(Arc::as_ptr(task))
  79. .remove();
  80. }
  81. }
  82. fn current(&self) -> Option<BorrowedArc<Task>> {
  83. CURRENT_TASK
  84. .get()
  85. .map(|ptr| unsafe { BorrowedArc::from_raw(ptr) })
  86. }
  87. fn remove_and_enqueue_current(&self, rq: &mut impl DerefMut<Target = dyn ReadyQueue>) {
  88. let Some(current) = CURRENT_TASK
  89. .swap(None)
  90. .map(|cur| unsafe { Arc::from_raw(cur.as_ptr()) })
  91. else {
  92. return;
  93. };
  94. match current.state.update(|state| match state {
  95. TaskState::READY_RUNNING => Some(TaskState::READY),
  96. TaskState::RUNNING => Some(TaskState::BLOCKED),
  97. _ => {
  98. unreachable!("Current task should be at least in RUNNING state, but got {state:?}")
  99. }
  100. }) {
  101. Ok(TaskState::READY_RUNNING) => {
  102. println_trace!(
  103. "trace_scheduler",
  104. "Re-enqueueing task {:?} (CPU{})",
  105. current.id,
  106. eonix_hal::processor::CPU::local().cpuid(),
  107. );
  108. rq.put(current);
  109. }
  110. Ok(_) => {
  111. println_trace!(
  112. "trace_scheduler",
  113. "Current task {:?} (CPU{}) is blocked, not re-enqueueing",
  114. current.id,
  115. eonix_hal::processor::CPU::local().cpuid(),
  116. );
  117. }
  118. _ => unreachable!(),
  119. }
  120. }
  121. pub fn block_till_woken(set_waker: impl FnOnce(&Waker)) -> impl Future<Output = ()> {
  122. struct BlockTillWoken<F: FnOnce(&Waker)> {
  123. set_waker: Option<F>,
  124. slept: bool,
  125. }
  126. impl<F: FnOnce(&Waker)> Future for BlockTillWoken<F> {
  127. type Output = ();
  128. fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
  129. if self.slept {
  130. Poll::Ready(())
  131. } else {
  132. let (set_waker, slept) = unsafe {
  133. let me = self.get_unchecked_mut();
  134. (me.set_waker.take().unwrap(), &mut me.slept)
  135. };
  136. set_waker(cx.waker());
  137. *slept = true;
  138. Poll::Pending
  139. }
  140. }
  141. }
  142. BlockTillWoken {
  143. set_waker: Some(set_waker),
  144. slept: false,
  145. }
  146. }
  147. /// Enter the runtime with an "init" future and run till its completion.
  148. ///
  149. /// The "init" future has the highest priority and when it completes,
  150. /// the runtime will exit immediately and yield its output.
  151. pub fn enter(&self) {
  152. loop {
  153. let mut rq = local_rq().lock_irq();
  154. self.remove_and_enqueue_current(&mut rq);
  155. let Some(next) = rq.get() else {
  156. drop(rq);
  157. halt();
  158. continue;
  159. };
  160. println_trace!(
  161. "trace_scheduler",
  162. "Switching to task {:?} (CPU{})",
  163. next.id,
  164. eonix_hal::processor::CPU::local().cpuid(),
  165. );
  166. let old_state = next.state.swap(TaskState::RUNNING);
  167. assert_eq!(
  168. old_state,
  169. TaskState::READY,
  170. "Next task should be in READY state"
  171. );
  172. unsafe {
  173. CURRENT_TASK.set(Some(NonNull::new_unchecked(Arc::into_raw(next) as *mut _)));
  174. }
  175. drop(rq);
  176. // TODO: MAYBE we can move the release of finished tasks to some worker thread.
  177. if Task::current().poll().is_ready() {
  178. let old_state = Task::current().state.swap(TaskState::DEAD);
  179. assert!(
  180. old_state & TaskState::RUNNING != 0,
  181. "Current task should be at least in RUNNING state"
  182. );
  183. println_trace!(
  184. "trace_scheduler",
  185. "Task {:?} finished execution, removing...",
  186. Task::current().id,
  187. );
  188. self.remove_task(&Task::current());
  189. CURRENT_TASK.set(None);
  190. }
  191. }
  192. }
  193. }