task.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. mod adapter;
  2. mod task_state;
  3. use crate::{
  4. context::ExecutionContext,
  5. executor::{ExecuteStatus, Executor, ExecutorBuilder, OutputHandle, Stack},
  6. run::{Contexted, Run},
  7. scheduler::Scheduler,
  8. };
  9. use alloc::{boxed::Box, sync::Arc, task::Wake};
  10. use atomic_unique_refcell::AtomicUniqueRefCell;
  11. use core::{
  12. pin::{pin, Pin},
  13. sync::atomic::{AtomicBool, AtomicU32, Ordering},
  14. task::{Context, Poll, Waker},
  15. };
  16. use eonix_hal::processor::CPU;
  17. use eonix_preempt::assert_preempt_enabled;
  18. use eonix_sync::Spin;
  19. use intrusive_collections::RBTreeAtomicLink;
  20. use task_state::TaskState;
  21. pub use adapter::TaskAdapter;
  22. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
  23. pub struct TaskId(u32);
  24. pub struct TaskHandle<Output>
  25. where
  26. Output: Send,
  27. {
  28. pub(crate) task: Arc<Task>,
  29. pub(crate) output_handle: Arc<Spin<OutputHandle<Output>>>,
  30. }
  31. /// A `Task` represents a schedulable unit.
  32. ///
  33. /// Initial: state = Running, unparked = false
  34. ///
  35. /// Task::park() => swap state <- Parking, assert prev == Running
  36. /// => swap unparked <- false
  37. /// -> true => store state <- Running => return
  38. /// -> false => goto scheduler => get rq lock => load state
  39. /// -> Running => enqueue
  40. /// -> Parking => cmpxchg Parking -> Parked
  41. /// -> Running => enqueue
  42. /// -> Parking => on_rq <- false
  43. /// -> Parked => ???
  44. ///
  45. /// Task::unpark() => swap unparked <- true
  46. /// -> true => return
  47. /// -> false => swap state <- Running
  48. /// -> Running => return
  49. /// -> Parking | Parked => Scheduler::activate
  50. pub struct Task {
  51. /// Unique identifier of the task.
  52. pub id: TaskId,
  53. /// Whether the task is on some run queue (a.k.a ready).
  54. pub(crate) on_rq: AtomicBool,
  55. /// Whether someone has called `unpark` on this task.
  56. pub(crate) unparked: AtomicBool,
  57. /// The last cpu that the task was executed on.
  58. /// If `on_rq` is `false`, we can't assume that this task is still on the cpu.
  59. pub(crate) cpu: AtomicU32,
  60. /// Task state.
  61. pub(crate) state: TaskState,
  62. /// Task execution context.
  63. pub(crate) execution_context: ExecutionContext,
  64. /// Executor object.
  65. executor: AtomicUniqueRefCell<Option<Pin<Box<dyn Executor>>>>,
  66. /// Link in the global task list.
  67. link_task_list: RBTreeAtomicLink,
  68. }
  69. impl Task {
  70. pub fn new<S, R>(runnable: R) -> TaskHandle<R::Output>
  71. where
  72. S: Stack + 'static,
  73. R: Run + Contexted + Send + 'static,
  74. R::Output: Send + 'static,
  75. {
  76. static ID: AtomicU32 = AtomicU32::new(0);
  77. let (executor, execution_context, output) = ExecutorBuilder::new()
  78. .stack(S::new())
  79. .runnable(runnable)
  80. .build();
  81. let task = Arc::new(Self {
  82. id: TaskId(ID.fetch_add(1, Ordering::Relaxed)),
  83. on_rq: AtomicBool::new(false),
  84. unparked: AtomicBool::new(false),
  85. cpu: AtomicU32::new(CPU::local().cpuid() as u32),
  86. state: TaskState::new(TaskState::RUNNING),
  87. executor: AtomicUniqueRefCell::new(Some(executor)),
  88. execution_context,
  89. link_task_list: RBTreeAtomicLink::new(),
  90. });
  91. TaskHandle {
  92. task,
  93. output_handle: output,
  94. }
  95. }
  96. pub fn run(&self) -> ExecuteStatus {
  97. let mut executor_borrow = self.executor.borrow();
  98. let executor = executor_borrow
  99. .as_ref()
  100. .expect("Executor should be present")
  101. .as_ref()
  102. .get_ref();
  103. if let ExecuteStatus::Finished = executor.progress() {
  104. executor_borrow.take();
  105. ExecuteStatus::Finished
  106. } else {
  107. ExecuteStatus::Executing
  108. }
  109. }
  110. pub fn unpark(self: &Arc<Self>) {
  111. if self.unparked.swap(true, Ordering::Release) {
  112. return;
  113. }
  114. eonix_preempt::disable();
  115. match self.state.swap(TaskState::RUNNING) {
  116. TaskState::RUNNING => {}
  117. TaskState::PARKED | TaskState::PARKING => {
  118. // We are waking up from sleep or someone else is parking this task.
  119. // Try to wake it up.
  120. Scheduler::get().activate(self);
  121. }
  122. _ => unreachable!(),
  123. }
  124. eonix_preempt::enable();
  125. }
  126. pub fn park() {
  127. eonix_preempt::disable();
  128. Self::park_preempt_disabled();
  129. }
  130. /// Park the current task with `preempt::count() == 1`.
  131. pub fn park_preempt_disabled() {
  132. let task = Task::current();
  133. let old_state = task.state.swap(TaskState::PARKING);
  134. assert_eq!(
  135. old_state,
  136. TaskState::RUNNING,
  137. "Parking a task that is not running."
  138. );
  139. if task.unparked.swap(false, Ordering::AcqRel) {
  140. // Someone has called `unpark` on this task previously.
  141. task.state.swap(TaskState::RUNNING);
  142. } else {
  143. unsafe {
  144. // SAFETY: Preemption is disabled.
  145. Scheduler::goto_scheduler(&Task::current().execution_context)
  146. };
  147. assert!(task.unparked.swap(false, Ordering::Acquire));
  148. }
  149. eonix_preempt::enable();
  150. }
  151. pub fn block_on<F>(future: F) -> F::Output
  152. where
  153. F: Future,
  154. {
  155. assert_preempt_enabled!("block_on() must be called with preemption enabled");
  156. let waker = Waker::from(Task::current().clone());
  157. let mut context = Context::from_waker(&waker);
  158. let mut future = pin!(future);
  159. loop {
  160. if let Poll::Ready(output) = future.as_mut().poll(&mut context) {
  161. break output;
  162. }
  163. Task::park();
  164. }
  165. }
  166. }
  167. impl Wake for Task {
  168. fn wake(self: Arc<Self>) {
  169. self.wake_by_ref();
  170. }
  171. fn wake_by_ref(self: &Arc<Self>) {
  172. self.unpark();
  173. }
  174. }